Skip to content

Commit

Permalink
Merge pull request #591 from Iceber/reflector_with_result_stream
Browse files Browse the repository at this point in the history
clustersynchro: handle each of the pages in the resource list stage
  • Loading branch information
Iceber authored Nov 28, 2023
2 parents f3e50db + 2fc4da1 commit 9c2e456
Show file tree
Hide file tree
Showing 15 changed files with 863 additions and 74 deletions.
3 changes: 2 additions & 1 deletion cmd/binding-apiserver/app/binding_apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/generated/clientset/versioned"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
"github.com/clusterpedia-io/clusterpedia/pkg/version/verflag"
)
Expand Down Expand Up @@ -56,7 +57,7 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command {
return err
}

synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory, nil)
synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory, clustersynchro.ClusterSyncConfig{})
go synchromanager.Run(1, ctx.Done())

server, err := completedConfig.New()
Expand Down
3 changes: 2 additions & 1 deletion cmd/clustersynchro-manager/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics"
metrics "github.com/clusterpedia-io/clusterpedia/pkg/metrics"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
)

type Config struct {
Expand All @@ -19,8 +20,8 @@ type Config struct {
WorkerNumber int
MetricsServerConfig metrics.Config
KubeMetricsServerConfig *kubestatemetrics.ServerConfig
MetricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
StorageFactory storage.StorageFactory
ClusterSyncConfig clustersynchro.ClusterSyncConfig

LeaderElection componentbaseconfig.LeaderElectionConfiguration
ClientConnection componentbaseconfig.ClientConnectionConfiguration
Expand Down
13 changes: 11 additions & 2 deletions cmd/clustersynchro-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/metrics"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
)

const (
Expand All @@ -44,7 +45,8 @@ type Options struct {
Metrics *metrics.Options
KubeStateMetrics *kubestatemetrics.Options

WorkerNumber int // WorkerNumber is the number of worker goroutines
WorkerNumber int // WorkerNumber is the number of worker goroutines
PageSizeForResourceSync int64
}

func NewClusterSynchroManagerOptions() (*Options, error) {
Expand Down Expand Up @@ -89,6 +91,9 @@ func (o *Options) Flags() cliflag.NamedFlagSets {
genericfs.Int32Var(&o.ClientConnection.Burst, "kube-api-burst", o.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver.")
genericfs.IntVar(&o.WorkerNumber, "worker-number", o.WorkerNumber, "The number of worker goroutines.")

syncfs := fss.FlagSet("resource sync")
syncfs.Int64Var(&o.PageSizeForResourceSync, "page-size", o.PageSizeForResourceSync, "The requested chunk size of initial and resync watch lists for resource sync")

options.BindLeaderElectionFlags(&o.LeaderElection, genericfs)

fs := fss.FlagSet("misc")
Expand Down Expand Up @@ -165,7 +170,11 @@ func (o *Options) Config() (*config.Config, error) {

MetricsServerConfig: metricsConfig,
KubeMetricsServerConfig: kubeStateMetricsServerConfig,
MetricsStoreBuilder: metricsStoreBuilder,

ClusterSyncConfig: clustersynchro.ClusterSyncConfig{
MetricsStoreBuilder: metricsStoreBuilder,
PageSizeForResourceSync: o.PageSizeForResourceSync,
},

LeaderElection: o.LeaderElection,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/clustersynchro-manager/app/synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewClusterSynchroManagerCommand(ctx context.Context) *cobra.Command {
}

func Run(ctx context.Context, c *config.Config) error {
synchromanager := synchromanager.NewManager(c.CRDClient, c.StorageFactory, c.MetricsStoreBuilder)
synchromanager := synchromanager.NewManager(c.CRDClient, c.StorageFactory, c.ClusterSyncConfig)

go func() {
metrics.RunServer(c.MetricsServerConfig)
Expand Down
11 changes: 11 additions & 0 deletions hack/verify-vendor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,14 @@ else
echo "the file 'reflector.go' in vendor has been changed, please update the 'cache/.reflector.go' and 'reflector.go' in the pkg/synchromanager/clustersynchro/informer"
exit 1
fi

pager=0
diff vendor/k8s.io/client-go/tools/pager/pager.go pkg/synchromanager/clustersynchro/informer/pager/.pager.go.copy || pager=$?

if [[ $pager -eq 0 ]]
then
echo "'pager.go' is up to date."
else
echo "the file 'pager.go' in vendor has been changed, please update the '.pager.go.copy' and 'pager.go' in the pkg/synchromanager/clustersynchro/informer/pager"
exit 1
fi
36 changes: 21 additions & 15 deletions pkg/synchromanager/clustersynchro/cluster_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@ import (
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
)

type ClusterSyncConfig struct {
MetricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
PageSizeForResourceSync int64
}

type ClusterSynchro struct {
name string

RESTConfig *rest.Config
ClusterStatusUpdater ClusterStatusUpdater

storage storage.StorageFactory
metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
syncConfig ClusterSyncConfig
healthChecker *healthChecker
dynamicDiscovery discovery.DynamicDiscoveryInterface
listerWatcherFactory informer.DynamicListerWatcherFactory
Expand Down Expand Up @@ -69,7 +74,7 @@ type ClusterStatusUpdater interface {

type RetryableError error

func New(name string, config *rest.Config, storage storage.StorageFactory, metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder, updater ClusterStatusUpdater) (*ClusterSynchro, error) {
func New(name string, config *rest.Config, storage storage.StorageFactory, updater ClusterStatusUpdater, syncConfig ClusterSyncConfig) (*ClusterSynchro, error) {
dynamicDiscovery, err := discovery.NewDynamicDiscoveryManager(name, config)
if err != nil {
return nil, RetryableError(fmt.Errorf("failed to create dynamic discovery manager: %w", err))
Expand Down Expand Up @@ -103,6 +108,7 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, metri
ClusterStatusUpdater: updater,
storage: storage,

syncConfig: syncConfig,
healthChecker: healthChecker,
dynamicDiscovery: dynamicDiscovery,
listerWatcherFactory: listWatchFactory,
Expand All @@ -115,8 +121,6 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, metri
stopRunnerCh: make(chan struct{}),

storageResourceVersions: make(map[schema.GroupVersionResource]map[string]interface{}),

metricsStoreBuilder: metricsStoreBuilder,
}

var refresherOnce sync.Once
Expand Down Expand Up @@ -352,18 +356,20 @@ func (s *ClusterSynchro) refreshSyncResources() {
}

var metricsStore *kubestatemetrics.MetricsStore
if s.metricsStoreBuilder != nil {
metricsStore = s.metricsStoreBuilder.GetMetricStore(s.name, config.syncResource)
if s.syncConfig.MetricsStoreBuilder != nil {
metricsStore = s.syncConfig.MetricsStoreBuilder.GetMetricStore(s.name, config.syncResource)
}
synchro := newResourceSynchro(
s.name,
config.syncResource,
config.kind,
s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
rvs,
config.convertor,
resourceStorage,
metricsStore,
synchro := newResourceSynchro(s.name,
ResourceSynchroConfig{
GroupVersionResource: config.syncResource,
Kind: config.kind,
ListerWatcher: s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
ObjectConvertor: config.convertor,
ResourceStorage: resourceStorage,
MetricsStore: metricsStore,
ResourceVersions: rvs,
PageSizeForInformer: s.syncConfig.PageSizeForResourceSync,
},
)
s.waitGroup.StartWithChannel(s.closer, synchro.Run)
s.storageResourceSynchros.Store(storageGVR, synchro)
Expand Down
13 changes: 12 additions & 1 deletion pkg/synchromanager/clustersynchro/informer/named_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ type Config struct {

// WatchListPageSize is the requested chunk size of initial and relist watch lists.
WatchListPageSize int64

// StreamHandle of paginated list, resources within a pager will be processed
// as soon as possible instead of waiting until all resources are pulled before calling the ResourceHandler.
StreamHandleForPaginatedList bool

// Force paging, Reflector will sometimes use APIServer's cache,
// even if paging is specified APIServer will return all resources for performance,
// then it will skip Reflector's streaming memory optimization.
ForcePaginatedList bool
}

type controller struct {
Expand Down Expand Up @@ -86,6 +95,8 @@ func (c *controller) Run(stopCh <-chan struct{}) {
}
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.ForcePaginatedList = c.config.ForcePaginatedList
r.StreamHandleForPaginatedList = c.config.StreamHandleForPaginatedList

c.reflectorMutex.Lock()
c.reflector = r
Expand Down Expand Up @@ -120,7 +131,7 @@ func (c *controller) HasSynced() bool {
if c.queue == nil {
return false
}
return c.queue.HasSynced()
return c.queue.HasSynced() && c.reflector.HasInitializedSynced()
}

func (c *controller) LastSyncResourceVersion() string {
Expand Down
Loading

0 comments on commit 9c2e456

Please sign in to comment.