Skip to content

Commit

Permalink
add FeatureGates and page-size flags about paginated list
Browse files Browse the repository at this point in the history
Signed-off-by: Iceber Gu <[email protected]>
  • Loading branch information
Iceber committed Nov 17, 2023
1 parent e6dfb90 commit 202bb30
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 55 deletions.
3 changes: 2 additions & 1 deletion cmd/binding-apiserver/app/binding_apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/generated/clientset/versioned"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
"github.com/clusterpedia-io/clusterpedia/pkg/version/verflag"
)
Expand Down Expand Up @@ -56,7 +57,7 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command {
return err
}

synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory, nil)
synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory, clustersynchro.ClusterSyncConfig{})
go synchromanager.Run(1, ctx.Done())

server, err := completedConfig.New()
Expand Down
3 changes: 2 additions & 1 deletion cmd/clustersynchro-manager/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics"
metrics "github.com/clusterpedia-io/clusterpedia/pkg/metrics"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
)

type Config struct {
Expand All @@ -19,8 +20,8 @@ type Config struct {
WorkerNumber int
MetricsServerConfig metrics.Config
KubeMetricsServerConfig *kubestatemetrics.ServerConfig
MetricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
StorageFactory storage.StorageFactory
ClusterSyncConfig clustersynchro.ClusterSyncConfig

LeaderElection componentbaseconfig.LeaderElectionConfiguration
ClientConnection componentbaseconfig.ClientConnectionConfiguration
Expand Down
13 changes: 11 additions & 2 deletions cmd/clustersynchro-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/metrics"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
)

const (
Expand All @@ -44,7 +45,8 @@ type Options struct {
Metrics *metrics.Options
KubeStateMetrics *kubestatemetrics.Options

WorkerNumber int // WorkerNumber is the number of worker goroutines
WorkerNumber int // WorkerNumber is the number of worker goroutines
PageSizeForResourceSync int64
}

func NewClusterSynchroManagerOptions() (*Options, error) {
Expand Down Expand Up @@ -89,6 +91,9 @@ func (o *Options) Flags() cliflag.NamedFlagSets {
genericfs.Int32Var(&o.ClientConnection.Burst, "kube-api-burst", o.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver.")
genericfs.IntVar(&o.WorkerNumber, "worker-number", o.WorkerNumber, "The number of worker goroutines.")

syncfs := fss.FlagSet("resource sync")
syncfs.Int64Var(&o.PageSizeForResourceSync, "page-size", o.PageSizeForResourceSync, "The requested chunk size of initial and resync watch lists for resource sync")

options.BindLeaderElectionFlags(&o.LeaderElection, genericfs)

fs := fss.FlagSet("misc")
Expand Down Expand Up @@ -165,7 +170,11 @@ func (o *Options) Config() (*config.Config, error) {

MetricsServerConfig: metricsConfig,
KubeMetricsServerConfig: kubeStateMetricsServerConfig,
MetricsStoreBuilder: metricsStoreBuilder,

ClusterSyncConfig: clustersynchro.ClusterSyncConfig{
MetricsStoreBuilder: metricsStoreBuilder,
PageSizeForResourceSync: o.PageSizeForResourceSync,
},

LeaderElection: o.LeaderElection,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/clustersynchro-manager/app/synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewClusterSynchroManagerCommand(ctx context.Context) *cobra.Command {
}

func Run(ctx context.Context, c *config.Config) error {
synchromanager := synchromanager.NewManager(c.CRDClient, c.StorageFactory, c.MetricsStoreBuilder)
synchromanager := synchromanager.NewManager(c.CRDClient, c.StorageFactory, c.ClusterSyncConfig)

go func() {
metrics.RunServer(c.MetricsServerConfig)
Expand Down
36 changes: 21 additions & 15 deletions pkg/synchromanager/clustersynchro/cluster_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@ import (
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
)

type ClusterSyncConfig struct {
MetricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
PageSizeForResourceSync int64
}

type ClusterSynchro struct {
name string

RESTConfig *rest.Config
ClusterStatusUpdater ClusterStatusUpdater

storage storage.StorageFactory
metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
syncConfig ClusterSyncConfig
healthChecker *healthChecker
dynamicDiscovery discovery.DynamicDiscoveryInterface
listerWatcherFactory informer.DynamicListerWatcherFactory
Expand Down Expand Up @@ -69,7 +74,7 @@ type ClusterStatusUpdater interface {

type RetryableError error

func New(name string, config *rest.Config, storage storage.StorageFactory, metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder, updater ClusterStatusUpdater) (*ClusterSynchro, error) {
func New(name string, config *rest.Config, storage storage.StorageFactory, updater ClusterStatusUpdater, syncConfig ClusterSyncConfig) (*ClusterSynchro, error) {
dynamicDiscovery, err := discovery.NewDynamicDiscoveryManager(name, config)
if err != nil {
return nil, RetryableError(fmt.Errorf("failed to create dynamic discovery manager: %w", err))
Expand Down Expand Up @@ -103,6 +108,7 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, metri
ClusterStatusUpdater: updater,
storage: storage,

syncConfig: syncConfig,
healthChecker: healthChecker,
dynamicDiscovery: dynamicDiscovery,
listerWatcherFactory: listWatchFactory,
Expand All @@ -115,8 +121,6 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, metri
stopRunnerCh: make(chan struct{}),

storageResourceVersions: make(map[schema.GroupVersionResource]map[string]interface{}),

metricsStoreBuilder: metricsStoreBuilder,
}

var refresherOnce sync.Once
Expand Down Expand Up @@ -352,18 +356,20 @@ func (s *ClusterSynchro) refreshSyncResources() {
}

var metricsStore *kubestatemetrics.MetricsStore
if s.metricsStoreBuilder != nil {
metricsStore = s.metricsStoreBuilder.GetMetricStore(s.name, config.syncResource)
if s.syncConfig.MetricsStoreBuilder != nil {
metricsStore = s.syncConfig.MetricsStoreBuilder.GetMetricStore(s.name, config.syncResource)
}
synchro := newResourceSynchro(
s.name,
config.syncResource,
config.kind,
s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
rvs,
config.convertor,
resourceStorage,
metricsStore,
synchro := newResourceSynchro(s.name,
ResourceSynchroConfig{
GroupVersionResource: config.syncResource,
Kind: config.kind,
ListerWatcher: s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
ObjectConvertor: config.convertor,
ResourceStorage: resourceStorage,
MetricsStore: metricsStore,
ResourceVersions: rvs,
PageSizeForInformer: s.syncConfig.PageSizeForResourceSync,
},
)
s.waitGroup.StartWithChannel(s.closer, synchro.Run)
s.storageResourceSynchros.Store(storageGVR, synchro)
Expand Down
84 changes: 57 additions & 27 deletions pkg/synchromanager/clustersynchro/resource_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,32 @@ import (
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
)

type ResourceSynchroConfig struct {
schema.GroupVersionResource
Kind string

cache.ListerWatcher
runtime.ObjectConvertor
storage.ResourceStorage

*kubestatemetrics.MetricsStore

ResourceVersions map[string]interface{}
PageSizeForInformer int64
}

func (c ResourceSynchroConfig) GroupVersionKind() schema.GroupVersionKind {
return c.GroupVersionResource.GroupVersion().WithKind(c.Kind)
}

type ResourceSynchro struct {
cluster string

example runtime.Object
syncResource schema.GroupVersionResource
storageResource schema.GroupVersionResource

pageSize int64
listerWatcher cache.ListerWatcher
metricsExtraStore informer.ExtraStore
metricsWriter *metricsstore.MetricsWriter
Expand Down Expand Up @@ -69,23 +88,22 @@ type ResourceSynchro struct {
runningStage string
}

func newResourceSynchro(cluster string, syncResource schema.GroupVersionResource, kind string, lw cache.ListerWatcher, rvs map[string]interface{},
convertor runtime.ObjectConvertor, storage storage.ResourceStorage, metricsStore *kubestatemetrics.MetricsStore,
) *ResourceSynchro {
storageConfig := storage.GetStorageConfig()
func newResourceSynchro(cluster string, config ResourceSynchroConfig) *ResourceSynchro {
storageConfig := config.ResourceStorage.GetStorageConfig()
synchro := &ResourceSynchro{
cluster: cluster,
syncResource: syncResource,
syncResource: config.GroupVersionResource,
storageResource: storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version),

listerWatcher: lw,
rvs: rvs,
pageSize: config.PageSizeForInformer,
listerWatcher: config.ListerWatcher,
rvs: config.ResourceVersions,

// all resources saved to the queue are `runtime.Object`
queue: queue.NewPressureQueue(cache.MetaNamespaceKeyFunc),

storage: storage,
convertor: convertor,
storage: config.ResourceStorage,
convertor: config.ObjectConvertor,
memoryVersion: storageConfig.MemoryVersion,

stopped: make(chan struct{}),
Expand All @@ -100,12 +118,12 @@ func newResourceSynchro(cluster string, syncResource schema.GroupVersionResource
synchro.ctx, synchro.cancel = context.WithCancel(context.Background())

example := &unstructured.Unstructured{}
example.SetGroupVersionKind(syncResource.GroupVersion().WithKind(kind))
example.SetGroupVersionKind(config.GroupVersionKind())
synchro.example = example

if metricsStore != nil {
synchro.metricsExtraStore = metricsStore
synchro.metricsWriter = metricsstore.NewMetricsWriter(metricsStore.MetricsStore)
if config.MetricsStore != nil {
synchro.metricsExtraStore = config.MetricsStore
synchro.metricsWriter = metricsstore.NewMetricsWriter(config.MetricsStore.MetricsStore)
}

synchro.setStatus(clusterv1alpha2.ResourceSyncStatusPending, "", "")
Expand Down Expand Up @@ -244,10 +262,22 @@ func (synchro *ResourceSynchro) Start(stopCh <-chan struct{}) {
synchro.rvsLock.Unlock()
}

informer.NewResourceVersionInformer(
synchro.cluster, synchro.listerWatcher, synchro.cache,
synchro.example, synchro, synchro.ErrorHandler, synchro.metricsExtraStore,
).Run(informerStopCh)
config := informer.InformerConfig{
ListerWatcher: synchro.listerWatcher,
Storage: synchro.cache,
ExampleObject: synchro.example,
Handler: synchro,
ErrorHandler: synchro.ErrorHandler,
ExtraStore: synchro.metricsExtraStore,
WatchListPageSize: synchro.pageSize,
}
if clusterpediafeature.FeatureGate.Enabled(features.StreamHandlePaginatedListForResourceSync) {
config.StreamHandleForPaginatedList = true
}
if clusterpediafeature.FeatureGate.Enabled(features.ForcePaginatedListForResourceSync) {
config.ForcePaginatedList = true
}
informer.NewResourceVersionInformer(synchro.cluster, config).Run(informerStopCh)

// TODO(Iceber): Optimize status updates in case of storage exceptions
if !synchro.isRunnableForStorage.Load() {
Expand Down Expand Up @@ -317,18 +347,18 @@ func (synchro *ResourceSynchro) OnDelete(obj interface{}) {
return
}

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
if obj, ok = d.Obj.(*unstructured.Unstructured); !ok {
namespace, name, err := cache.SplitMetaNamespaceKey(d.Key)
if err != nil {
return
}
obj = &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}}
}
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return
if o, ok := obj.(*unstructured.Unstructured); ok {
synchro.pruneObject(o)
}

// Since it is not necessary to save the complete deleted object to the queue,
// we convert the object to `PartialObjectMetadata`
obj = &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}}
_ = synchro.queue.Delete(obj)
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/synchromanager/clustersynchro_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@ type Manager struct {

queue workqueue.RateLimitingInterface
storage storage.StorageFactory
metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
clusterlister clusterlister.PediaClusterLister
clusterSyncResourcesLister clusterlister.ClusterSyncResourcesLister
clusterInformer cache.SharedIndexInformer

synchrolock sync.RWMutex
synchros map[string]*clustersynchro.ClusterSynchro
synchroWaitGroup wait.Group
clusterSyncConfig clustersynchro.ClusterSyncConfig
synchrolock sync.RWMutex
synchros map[string]*clustersynchro.ClusterSynchro
synchroWaitGroup wait.Group
}

var _ kubestatemetrics.ClusterMetricsWriterListGetter = &Manager{}

func NewManager(client crdclientset.Interface, storage storage.StorageFactory, metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder) *Manager {
func NewManager(client crdclientset.Interface, storage storage.StorageFactory, syncConfig clustersynchro.ClusterSyncConfig) *Manager {
factory := externalversions.NewSharedInformerFactory(client, 0)
clusterinformer := factory.Cluster().V1alpha2().PediaClusters()
clusterSyncResourcesInformer := factory.Cluster().V1alpha2().ClusterSyncResources()
Expand All @@ -72,15 +72,15 @@ func NewManager(client crdclientset.Interface, storage storage.StorageFactory, m
clusterpediaclient: client,

storage: storage,
metricsStoreBuilder: metricsStoreBuilder,
clusterlister: clusterinformer.Lister(),
clusterInformer: clusterinformer.Informer(),
clusterSyncResourcesLister: clusterSyncResourcesInformer.Lister(),
queue: workqueue.NewRateLimitingQueue(
NewItemExponentialFailureAndJitterSlowRateLimter(2*time.Second, 15*time.Second, 1*time.Minute, 1.0, defaultRetryNum),
),

synchros: make(map[string]*clustersynchro.ClusterSynchro),
clusterSyncConfig: syncConfig,
synchros: make(map[string]*clustersynchro.ClusterSynchro),
}

if _, err := clusterinformer.Informer().AddEventHandler(
Expand Down Expand Up @@ -348,7 +348,7 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster)

// create resource synchro
if synchro == nil {
synchro, err = clustersynchro.New(cluster.Name, config, manager.storage, manager.metricsStoreBuilder, manager)
synchro, err = clustersynchro.New(cluster.Name, config, manager.storage, manager, manager.clusterSyncConfig)
if err != nil {
_, forever := err.(clustersynchro.RetryableError)
klog.ErrorS(err, "Failed to create cluster synchro", "cluster", cluster.Name)
Expand Down
19 changes: 19 additions & 0 deletions pkg/synchromanager/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,25 @@ const (
AllowSyncAllResources featuregate.Feature = "AllowSyncAllResources"

// HealthCheckerWithStandaloneTCP is a feature gate for the cluster health checker to use standalone tcp
//
// owner: @iceber
// alpha: v0.6.0
HealthCheckerWithStandaloneTCP featuregate.Feature = "HealthCheckerWithStandaloneTCP"

// ForcePaginatedListForResourceSync is a feature gate for ResourceSync's reflector to force paginated list,
// reflector will sometimes use APIServer's cache, even if paging is specified APIServer will return all resources for performance,
// then it will skip Reflector's streaming memory optimization.
//
// owner: @iceber
// alpha: v0.8.0
ForcePaginatedListForResourceSync featuregate.Feature = "ForcePaginatedListForResourceSync"

// StreamHandlePaginatedListForResourceSync is a feature gate for ResourceSync's reflector to handle echo paginated resources,
// resources within a pager will be processed as soon as possible instead of waiting until all resources are pulled before calling the ResourceHandler.
//
// owner: @iceber
// alpha: v0.8.0
StreamHandlePaginatedListForResourceSync featuregate.Feature = "StreamHandlePaginatedListForResourceSync"
)

func init() {
Expand All @@ -59,4 +75,7 @@ var defaultClusterSynchroManagerFeatureGates = map[featuregate.Feature]featurega
AllowSyncAllCustomResources: {Default: false, PreRelease: featuregate.Alpha},
AllowSyncAllResources: {Default: false, PreRelease: featuregate.Alpha},
HealthCheckerWithStandaloneTCP: {Default: false, PreRelease: featuregate.Alpha},

ForcePaginatedListForResourceSync: {Default: false, PreRelease: featuregate.Alpha},
StreamHandlePaginatedListForResourceSync: {Default: false, PreRelease: featuregate.Alpha},
}

0 comments on commit 202bb30

Please sign in to comment.