Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Cherry pick CRS related fixes #23

Merged
merged 3 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions bootstrap/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,27 @@ var (
controllerName = "cluster-api-kubeadm-bootstrap-manager"

// flags.
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
clusterCacheTrackerClientQPS float32
clusterCacheTrackerClientBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
// CABPK specific flags.
clusterConcurrency int
clusterCacheTrackerConcurrency int
Expand Down Expand Up @@ -139,10 +141,16 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
"Maximum queries per second from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")

fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
"Maximum number of queries that should be allowed in one burst from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")

fs.DurationVar(&tokenTTL, "bootstrap-token-ttl", kubeadmbootstrapcontrollers.DefaultTokenTTL,
"The amount of time the bootstrap token will be valid")
Expand Down Expand Up @@ -312,6 +320,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
SecretCachingClient: secretCachingClient,
ControllerName: controllerName,
Log: &ctrl.Log,
ClientQPS: clusterCacheTrackerClientQPS,
ClientBurst: clusterCacheTrackerClientBurst,
},
)
if err != nil {
Expand Down
29 changes: 27 additions & 2 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ var ErrClusterLocked = errors.New("cluster is locked already")

// ClusterCacheTracker manages client caches for workload clusters.
type ClusterCacheTracker struct {
log logr.Logger
log logr.Logger

clientUncachedObjects []client.Object
clientQPS float32
clientBurst int

client client.Client

Expand Down Expand Up @@ -116,7 +119,18 @@ type ClusterCacheTrackerOptions struct {
// it'll instead query the API server directly.
// Defaults to never caching ConfigMap and Secret if not set.
ClientUncachedObjects []client.Object
Indexes []Index

// ClientQPS is the maximum queries per second from the controller client
// to the Kubernetes API server of workload clusters.
// Defaults to 20.
ClientQPS float32

// ClientBurst is the maximum number of queries that should be allowed in
// one burst from the controller client to the Kubernetes API server of workload clusters.
// Default 30.
ClientBurst int

Indexes []Index

// ControllerName is the name of the controller.
// This is used to calculate the user agent string.
Expand All @@ -139,6 +153,13 @@ func setDefaultOptions(opts *ClusterCacheTrackerOptions) {
&corev1.Secret{},
}
}

if opts.ClientQPS == 0 {
opts.ClientQPS = 20
}
if opts.ClientBurst == 0 {
opts.ClientBurst = 30
}
}

// NewClusterCacheTracker creates a new ClusterCacheTracker.
Expand Down Expand Up @@ -170,6 +191,8 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt
controllerPodMetadata: controllerPodMetadata,
log: *options.Log,
clientUncachedObjects: options.ClientUncachedObjects,
clientQPS: options.ClientQPS,
clientBurst: options.ClientBurst,
client: manager.GetClient(),
secretCachingClient: options.SecretCachingClient,
scheme: manager.GetScheme(),
Expand Down Expand Up @@ -303,6 +326,8 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
if err != nil {
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
}
config.QPS = t.clientQPS
config.Burst = t.clientBurst

// Create a http client and a mapper for the cluster.
httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster)
Expand Down
52 changes: 31 additions & 21 deletions controlplane/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,27 @@ var (
controllerName = "cluster-api-kubeadm-control-plane-manager"

// flags.
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
enableLeaderElection bool
leaderElectionLeaseDuration time.Duration
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
syncPeriod time.Duration
restConfigQPS float32
restConfigBurst int
clusterCacheTrackerClientQPS float32
clusterCacheTrackerClientBurst int
webhookPort int
webhookCertDir string
webhookCertName string
webhookKeyName string
healthAddr string
tlsOptions = flags.TLSOptions{}
diagnosticsOptions = flags.DiagnosticsOptions{}
logOptions = logs.NewOptions()
// KCP specific flags.
kubeadmControlPlaneConcurrency int
clusterCacheTrackerConcurrency int
Expand Down Expand Up @@ -142,10 +144,16 @@ func InitFlags(fs *pflag.FlagSet) {
"The minimum interval at which watched resources are reconciled (e.g. 15m)")

fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
"Maximum queries per second from the controller client to the Kubernetes API server.")

fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")

fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
"Maximum queries per second from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")

fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
"Maximum number of queries that should be allowed in one burst from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")
Expand Down Expand Up @@ -332,6 +340,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
&appsv1.Deployment{},
&appsv1.DaemonSet{},
},
ClientQPS: clusterCacheTrackerClientQPS,
ClientBurst: clusterCacheTrackerClientBurst,
})
if err != nil {
setupLog.Error(err, "unable to create cluster cache tracker")
Expand Down
70 changes: 55 additions & 15 deletions exp/addons/internal/controllers/clusterresourceset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,22 @@ func (r *ClusterResourceSetReconciler) Reconcile(ctx context.Context, req ctrl.R

// Return an aggregated error if errors occurred.
if len(errs) > 0 {
// When there are more than one ClusterResourceSet targeting the same cluster,
// there might be conflict when reconciling those ClusterResourceSet in parallel because they all try to
// patch the same ClusterResourceSetBinding Object.
// In case of patching conflicts we don't want to go on exponential backoff, otherwise it might take an
// arbitrary long time to get to stable state due to the backoff delay quickly growing.
// Instead, we are requeueing with an interval to make the system a little bit more predictable (and stabilize tests).
// NOTE: Conflicts happens mostly when ClusterResourceSetBinding is initialized / an entry is added for each
// cluster resource set targeting the same cluster.
for _, err := range errs {
if aggregate, ok := err.(kerrors.Aggregate); ok {
if len(aggregate.Errors()) == 1 && apierrors.IsConflict(aggregate.Errors()[0]) {
log.Info("Conflict in patching a ClusterResourceSetBinding that is updated by more than one ClusterResourceSet, requeueing")
return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, nil
}
}
}
return ctrl.Result{}, kerrors.NewAggregate(errs)
}

Expand Down Expand Up @@ -261,6 +277,40 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte
log := ctrl.LoggerFrom(ctx, "Cluster", klog.KObj(cluster))
ctx = ctrl.LoggerInto(ctx, log)

// Iterate all resources and ensure an ownerReference to the clusterResourceSet is on the resource.
// NOTE: we have to do this before getting a remote client, otherwise owner reference won't be created until it is
// possible to connect to the remote cluster.
errList := []error{}
objList := make([]*unstructured.Unstructured, len(clusterResourceSet.Spec.Resources))
for i, resource := range clusterResourceSet.Spec.Resources {
unstructuredObj, err := r.getResource(ctx, resource, cluster.GetNamespace())
if err != nil {
if err == ErrSecretTypeNotSupported {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.WrongSecretTypeReason, clusterv1.ConditionSeverityWarning, err.Error())
} else {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.RetrievingResourceFailedReason, clusterv1.ConditionSeverityWarning, err.Error())

// Continue without adding the error to the aggregate if we can't find the resource.
if apierrors.IsNotFound(err) {
continue
}
}
errList = append(errList, err)
continue
}

// Ensure an ownerReference to the clusterResourceSet is on the resource.
if err := r.ensureResourceOwnerRef(ctx, clusterResourceSet, unstructuredObj); err != nil {
log.Error(err, "Failed to add ClusterResourceSet as resource owner reference",
"Resource type", unstructuredObj.GetKind(), "Resource name", unstructuredObj.GetName())
errList = append(errList, err)
}
objList[i] = unstructuredObj
}
if len(errList) > 0 {
return kerrors.NewAggregate(errList)
}

remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
if err != nil {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.RemoteClusterClientFailedReason, clusterv1.ConditionSeverityError, err.Error())
Expand Down Expand Up @@ -298,24 +348,14 @@ func (r *ClusterResourceSetReconciler) ApplyClusterResourceSet(ctx context.Conte
Name: clusterResourceSet.Name,
UID: clusterResourceSet.UID,
}))
var errList []error

resourceSetBinding := clusterResourceSetBinding.GetOrCreateBinding(clusterResourceSet)

// Iterate all resources and apply them to the cluster and update the resource status in the ClusterResourceSetBinding object.
for _, resource := range clusterResourceSet.Spec.Resources {
unstructuredObj, err := r.getResource(ctx, resource, cluster.GetNamespace())
if err != nil {
if err == ErrSecretTypeNotSupported {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.WrongSecretTypeReason, clusterv1.ConditionSeverityWarning, err.Error())
} else {
conditions.MarkFalse(clusterResourceSet, addonsv1.ResourcesAppliedCondition, addonsv1.RetrievingResourceFailedReason, clusterv1.ConditionSeverityWarning, err.Error())

// Continue without adding the error to the aggregate if we can't find the resource.
if apierrors.IsNotFound(err) {
continue
}
}
errList = append(errList, err)
for i, resource := range clusterResourceSet.Spec.Resources {
unstructuredObj := objList[i]
if unstructuredObj == nil {
// Continue without adding the error to the aggregate if we can't find the resource.
continue
}

Expand Down
Loading
Loading