-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This change adds a TypedReconciler which allows to customize the type being used in the workqueue. There is a number of situations where a custom type might be better than the default `reconcile.Request`: * Multi-Cluster controllers might want to put the clusters in there * Some controllers do not reconcile individual resources of a given type but all of them at once, for example IngressControllers might do this * Some controllers do not operate on Kubernetes resources at all
- Loading branch information
1 parent
e28a842
commit 64e0f0b
Showing
29 changed files
with
954 additions
and
567 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"os" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/types" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/builder" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/cluster" | ||
"sigs.k8s.io/controller-runtime/pkg/controller" | ||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" | ||
"sigs.k8s.io/controller-runtime/pkg/handler" | ||
"sigs.k8s.io/controller-runtime/pkg/log" | ||
"sigs.k8s.io/controller-runtime/pkg/log/zap" | ||
"sigs.k8s.io/controller-runtime/pkg/manager/signals" | ||
"sigs.k8s.io/controller-runtime/pkg/reconcile" | ||
"sigs.k8s.io/controller-runtime/pkg/source" | ||
) | ||
|
||
func main() { | ||
if err := run(); err != nil { | ||
fmt.Fprintf(os.Stderr, "%v\n", err) | ||
os.Exit(1) | ||
} | ||
} | ||
|
||
const ( | ||
sourceNamespace = "namespace-to-sync-all-secrets-from" | ||
targetNamespace = "namespace-to-sync-all-secrets-to" | ||
) | ||
|
||
func run() error { | ||
log.SetLogger(zap.New()) | ||
|
||
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) | ||
if err != nil { | ||
return fmt.Errorf("failed to construct manager: %w", err) | ||
} | ||
|
||
allTargets := map[string]cluster.Cluster{} | ||
|
||
cluster, err := cluster.New(ctrl.GetConfigOrDie()) | ||
if err != nil { | ||
return fmt.Errorf("failed to construct clusters: %w", err) | ||
} | ||
if err := mgr.Add(cluster); err != nil { | ||
return fmt.Errorf("failed to add cluster to manager: %w", err) | ||
} | ||
|
||
// Add more target clusters here as needed | ||
allTargets["self"] = cluster | ||
|
||
b := builder.TypedControllerManagedBy[request](mgr). | ||
Named("secret-sync"). | ||
// Watch secrets in the source namespace of the source cluster and | ||
// create requests for each target cluster | ||
WatchesRawSource(source.TypedKind( | ||
mgr.GetCache(), | ||
&corev1.Secret{}, | ||
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, s *corev1.Secret) []request { | ||
if s.Namespace != sourceNamespace { | ||
return nil | ||
} | ||
|
||
result := make([]request, 0, len(allTargets)) | ||
for targetCluster := range allTargets { | ||
result = append(result, request{ | ||
NamespacedName: types.NamespacedName{Namespace: s.Namespace, Name: s.Name}, | ||
clusterName: targetCluster, | ||
}) | ||
} | ||
|
||
return result | ||
}), | ||
)). | ||
WithOptions(controller.TypedOptions[request]{MaxConcurrentReconciles: 10}) | ||
|
||
for targetClusterName, targetCluster := range allTargets { | ||
// Watch secrets in the target namespace of each target cluster | ||
// and create a request for itself. | ||
b = b.WatchesRawSource(source.TypedKind( | ||
targetCluster.GetCache(), | ||
&corev1.Secret{}, | ||
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, s *corev1.Secret) []request { | ||
if s.Namespace != targetNamespace { | ||
return nil | ||
} | ||
|
||
return []request{{ | ||
NamespacedName: types.NamespacedName{Namespace: sourceNamespace, Name: s.Name}, | ||
clusterName: targetClusterName, | ||
}} | ||
}), | ||
)) | ||
} | ||
|
||
clients := make(map[string]client.Client, len(allTargets)) | ||
for targetClusterName, targetCluster := range allTargets { | ||
clients[targetClusterName] = targetCluster.GetClient() | ||
} | ||
|
||
if err := b.Complete(&secretSyncReconcier{ | ||
source: mgr.GetClient(), | ||
targets: clients, | ||
}); err != nil { | ||
return fmt.Errorf("failed to build reconciler: %w", err) | ||
} | ||
|
||
ctx := signals.SetupSignalHandler() | ||
if err := mgr.Start(ctx); err != nil { | ||
return fmt.Errorf("failed to start manager: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
type request struct { | ||
types.NamespacedName | ||
clusterName string | ||
} | ||
|
||
// secretSyncReconcier is a simple reconciler that keeps all secrets in the source namespace of a given | ||
// source cluster in sync with the secrets in the target namespace of all target clusters. | ||
type secretSyncReconcier struct { | ||
source client.Client | ||
targets map[string]client.Client | ||
} | ||
|
||
func (s *secretSyncReconcier) Reconcile(ctx context.Context, req request) (reconcile.Result, error) { | ||
targetClient, found := s.targets[req.clusterName] | ||
if !found { | ||
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("target cluster %s not found", req.clusterName)) | ||
} | ||
|
||
var reference corev1.Secret | ||
if err := s.source.Get(ctx, req.NamespacedName, &reference); err != nil { | ||
if !apierrors.IsNotFound(err) { | ||
return reconcile.Result{}, fmt.Errorf("failed to get secret %s from reference cluster: %w", req.String(), err) | ||
} | ||
if err := targetClient.Delete(ctx, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{ | ||
Name: req.Name, | ||
Namespace: targetNamespace, | ||
}}); err != nil { | ||
if !apierrors.IsNotFound(err) { | ||
return reconcile.Result{}, fmt.Errorf("failed to delete secret %s/%s in cluster %s: %w", targetNamespace, req.Name, req.clusterName, err) | ||
} | ||
|
||
return reconcile.Result{}, nil | ||
} | ||
|
||
log.FromContext(ctx).Info("Deleted secret", "cluster", req.clusterName, "namespace", targetNamespace, "name", req.Name) | ||
return reconcile.Result{}, nil | ||
} | ||
|
||
target := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{ | ||
Name: reference.Name, | ||
Namespace: targetNamespace, | ||
}} | ||
result, err := controllerutil.CreateOrUpdate(ctx, targetClient, target, func() error { | ||
target.Data = reference.Data | ||
return nil | ||
}) | ||
if err != nil { | ||
return reconcile.Result{}, fmt.Errorf("failed to upsert target secret %s/%s: %w", target.Namespace, target.Name, err) | ||
} | ||
|
||
if result != controllerutil.OperationResultNone { | ||
log.FromContext(ctx).Info("Upserted secret", "cluster", req.clusterName, "namespace", targetNamespace, "name", req.Name, "result", result) | ||
} | ||
|
||
return reconcile.Result{}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"os" | ||
|
||
networkingv1 "k8s.io/api/networking/v1" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/builder" | ||
"sigs.k8s.io/controller-runtime/pkg/handler" | ||
"sigs.k8s.io/controller-runtime/pkg/manager/signals" | ||
"sigs.k8s.io/controller-runtime/pkg/reconcile" | ||
"sigs.k8s.io/controller-runtime/pkg/source" | ||
) | ||
|
||
func main() { | ||
if err := run(); err != nil { | ||
fmt.Fprintf(os.Stderr, "%v\n", err) | ||
os.Exit(1) | ||
} | ||
} | ||
|
||
func run() error { | ||
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) | ||
if err != nil { | ||
return fmt.Errorf("failed to construct manager: %w", err) | ||
} | ||
|
||
// Use a request type that is always equal to itself so the workqueue | ||
// de-duplicates all events. | ||
// This can for example be useful for an ingress-controller that | ||
// generates a config from all ingresses, rather than individual ones. | ||
type request struct{} | ||
|
||
r := reconcile.TypedFunc[request](func(ctx context.Context, _ request) (reconcile.Result, error) { | ||
ingressList := &networkingv1.IngressList{} | ||
if err := mgr.GetClient().List(ctx, ingressList); err != nil { | ||
return reconcile.Result{}, fmt.Errorf("failed to list ingresses: %w", err) | ||
} | ||
|
||
buildIngressConfig(ingressList) | ||
return reconcile.Result{}, nil | ||
}) | ||
if err := builder.TypedControllerManagedBy[request](mgr). | ||
WatchesRawSource(source.TypedKind( | ||
mgr.GetCache(), | ||
&networkingv1.Ingress{}, | ||
handler.TypedEnqueueRequestsFromMapFunc(func(context.Context, *networkingv1.Ingress) []request { | ||
return []request{{}} | ||
})), | ||
). | ||
Named("ingress_controller"). | ||
Complete(r); err != nil { | ||
return fmt.Errorf("failed to construct ingress-controller: %w", err) | ||
} | ||
|
||
ctx := signals.SetupSignalHandler() | ||
if err := mgr.Start(ctx); err != nil { | ||
return fmt.Errorf("failed to start manager: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func buildIngressConfig(*networkingv1.IngressList) {} |
Oops, something went wrong.