diff --git a/apis/networking/v1alpha1/wggatewayclient_types.go b/apis/networking/v1alpha1/wggatewayclient_types.go index ad705ddd10..371794475a 100644 --- a/apis/networking/v1alpha1/wggatewayclient_types.go +++ b/apis/networking/v1alpha1/wggatewayclient_types.go @@ -37,8 +37,6 @@ var WgGatewayClientGroupVersionResource = GroupVersion.WithResource(WgGatewayCli // WgGatewayClientSpec defines the desired state of WgGatewayClient. type WgGatewayClientSpec struct { - // MTU specifies the MTU of the tunnel. - MTU int `json:"mtu"` // Deployment specifies the deployment template for the client. Deployment DeploymentTemplate `json:"deployment"` } @@ -46,7 +44,7 @@ type WgGatewayClientSpec struct { // WgGatewayClientStatus defines the observed state of WgGatewayClient. type WgGatewayClientStatus struct { // SecretRef specifies the reference to the secret. - SecretRef corev1.ObjectReference `json:"secretRef,omitempty"` + SecretRef *corev1.ObjectReference `json:"secretRef,omitempty"` } // +kubebuilder:object:root=true diff --git a/apis/networking/v1alpha1/wggatewayserver_types.go b/apis/networking/v1alpha1/wggatewayserver_types.go index b5c92440be..6c21bc148f 100644 --- a/apis/networking/v1alpha1/wggatewayserver_types.go +++ b/apis/networking/v1alpha1/wggatewayserver_types.go @@ -54,8 +54,6 @@ type DeploymentTemplate struct { // WgGatewayServerSpec defines the desired state of WgGatewayServer. type WgGatewayServerSpec struct { - // MTU specifies the MTU of the tunnel. - MTU int `json:"mtu"` // Service specifies the service template for the server. Service ServiceTemplate `json:"service"` // Deployment specifies the deployment template for the server. @@ -65,9 +63,9 @@ type WgGatewayServerSpec struct { // WgGatewayServerStatus defines the observed state of WgGatewayServer. type WgGatewayServerStatus struct { // SecretRef specifies the reference to the secret. - SecretRef corev1.ObjectReference `json:"secretRef,omitempty"` + SecretRef *corev1.ObjectReference `json:"secretRef,omitempty"` // Endpoint specifies the endpoint of the server. - Endpoint EndpointStatus `json:"endpoint,omitempty"` + Endpoint *EndpointStatus `json:"endpoint,omitempty"` } // +kubebuilder:object:root=true diff --git a/apis/networking/v1alpha1/zz_generated.deepcopy.go b/apis/networking/v1alpha1/zz_generated.deepcopy.go index 677e1819e2..09f8849405 100644 --- a/apis/networking/v1alpha1/zz_generated.deepcopy.go +++ b/apis/networking/v1alpha1/zz_generated.deepcopy.go @@ -770,7 +770,7 @@ func (in *WgGatewayClient) DeepCopyInto(out *WgGatewayClient) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WgGatewayClient. @@ -842,7 +842,11 @@ func (in *WgGatewayClientSpec) DeepCopy() *WgGatewayClientSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WgGatewayClientStatus) DeepCopyInto(out *WgGatewayClientStatus) { *out = *in - out.SecretRef = in.SecretRef + if in.SecretRef != nil { + in, out := &in.SecretRef, &out.SecretRef + *out = new(v1.ObjectReference) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WgGatewayClientStatus. @@ -1009,8 +1013,16 @@ func (in *WgGatewayServerSpec) DeepCopy() *WgGatewayServerSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WgGatewayServerStatus) DeepCopyInto(out *WgGatewayServerStatus) { *out = *in - out.SecretRef = in.SecretRef - in.Endpoint.DeepCopyInto(&out.Endpoint) + if in.SecretRef != nil { + in, out := &in.SecretRef, &out.SecretRef + *out = new(v1.ObjectReference) + **out = **in + } + if in.Endpoint != nil { + in, out := &in.Endpoint, &out.Endpoint + *out = new(EndpointStatus) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WgGatewayServerStatus. diff --git a/cmd/liqo-controller-manager/main.go b/cmd/liqo-controller-manager/main.go index 2769edbb7d..93448c7dbd 100644 --- a/cmd/liqo-controller-manager/main.go +++ b/cmd/liqo-controller-manager/main.go @@ -65,6 +65,7 @@ import ( clientoperator "github.com/liqotech/liqo/pkg/liqo-controller-manager/external-network/client-operator" configurationcontroller "github.com/liqotech/liqo/pkg/liqo-controller-manager/external-network/configuration-controller" serveroperator "github.com/liqotech/liqo/pkg/liqo-controller-manager/external-network/server-operator" + wggatewaycontrollers "github.com/liqotech/liqo/pkg/liqo-controller-manager/external-network/wireguard" foreignclusteroperator "github.com/liqotech/liqo/pkg/liqo-controller-manager/foreign-cluster-operator" ipctrl "github.com/liqotech/liqo/pkg/liqo-controller-manager/ip-controller" mapsctrl "github.com/liqotech/liqo/pkg/liqo-controller-manager/namespacemap-controller" @@ -344,13 +345,42 @@ func main() { os.Exit(1) } - if err := mgr.Add(auxmgrLocalPods); err != nil { - klog.Errorf("Unable to add the auxiliary manager to the main one: %w", err) + // Create a label selector to filter only events that are part of the Gateway + reqExtNetworkPods, err := labels.NewRequirement(consts.ExternalNetworkLabel, selection.Equals, []string{consts.ExternalNetworkLabelValue}) + utilruntime.Must(err) + + // Create an accessory manager that cache only local offloaded pods. + // This manager caches only the pods that are offloaded and scheduled on a remote cluster. + auxmgrExtNetworkPods, err := ctrl.NewManager(config, ctrl.Options{ + MapperProvider: mapper.LiqoMapperProvider(scheme), + Scheme: scheme, + MetricsBindAddress: "0", // Disable the metrics of the auxiliary manager to prevent conflicts. + NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) { + opts.ByObject = map[client.Object]cache.ByObject{ + &corev1.Pod{}: { + Label: labels.NewSelector().Add(*reqExtNetworkPods), + }, + } + return cache.New(config, opts) + }, + }) + + if err != nil { + klog.Errorf("Unable to create auxiliary manager: %w", err) os.Exit(1) } + // Add all the auxiliary managers to the main one. + if err := mgr.Add(auxmgrLocalPods); err != nil { + klog.Errorf("Unable to add the LocalPods auxiliary manager to the main one: %w", err) + os.Exit(1) + } if err := mgr.Add(auxmgrVirtualKubeletPods); err != nil { - klog.Errorf("Unable to add the auxiliary manager to the main one: %w", err) + klog.Errorf("Unable to add the VirtualKubeletPods auxiliary manager to the main one: %w", err) + os.Exit(1) + } + if err := mgr.Add(auxmgrExtNetworkPods); err != nil { + klog.Errorf("Unable to add the ExternalNetworkPods auxiliary manager to the main one: %w", err) os.Exit(1) } @@ -626,11 +656,25 @@ func main() { klog.Errorf("Unable to start the ipReconciler", err) os.Exit(1) } + cfgr := configurationcontroller.NewConfigurationReconciler(mgr.GetClient(), mgr.GetScheme(), mgr.GetEventRecorderFor("configuration-controller")) if err = cfgr.SetupWithManager(mgr); err != nil { klog.Errorf("unable to create controller ConfigurationReconciler: %s", err) os.Exit(1) } + + wgServerRec := wggatewaycontrollers.NewWgGatewayServerReconciler( + mgr.GetClient(), mgr.GetScheme(), auxmgrExtNetworkPods.GetClient()) + if err = wgServerRec.SetupWithManager(mgr); err != nil { + klog.Errorf("Unable to start the WgGatewayServerReconciler", err) + os.Exit(1) + } + + wgClientRec := wggatewaycontrollers.NewWgGatewayClientReconciler(mgr.GetClient(), mgr.GetScheme()) + if err = wgClientRec.SetupWithManager(mgr); err != nil { + klog.Errorf("Unable to start the WgGatewayClientReconciler", err) + os.Exit(1) + } } klog.Info("starting manager as controller manager") diff --git a/deployments/liqo/charts/liqo-crds/crds/networking.liqo.io_wggatewayclients.yaml b/deployments/liqo/charts/liqo-crds/crds/networking.liqo.io_wggatewayclients.yaml index d6ecfba619..880109bd3c 100644 --- a/deployments/liqo/charts/liqo-crds/crds/networking.liqo.io_wggatewayclients.yaml +++ b/deployments/liqo/charts/liqo-crds/crds/networking.liqo.io_wggatewayclients.yaml @@ -8585,12 +8585,8 @@ spec: - template type: object type: object - mtu: - description: MTU specifies the MTU of the tunnel. - type: integer required: - deployment - - mtu type: object status: description: WgGatewayClientStatus defines the observed state of WgGatewayClient. diff --git a/deployments/liqo/charts/liqo-crds/crds/networking.liqo.io_wggatewayservers.yaml b/deployments/liqo/charts/liqo-crds/crds/networking.liqo.io_wggatewayservers.yaml index 77d8105c15..0b41283b36 100644 --- a/deployments/liqo/charts/liqo-crds/crds/networking.liqo.io_wggatewayservers.yaml +++ b/deployments/liqo/charts/liqo-crds/crds/networking.liqo.io_wggatewayservers.yaml @@ -8585,9 +8585,6 @@ spec: - template type: object type: object - mtu: - description: MTU specifies the MTU of the tunnel. - type: integer service: description: Service specifies the service template for the server. properties: @@ -8954,7 +8951,6 @@ spec: type: object required: - deployment - - mtu - service type: object status: diff --git a/pkg/consts/annotations.go b/pkg/consts/annotations.go index bab69a1e2b..0326c3a638 100644 --- a/pkg/consts/annotations.go +++ b/pkg/consts/annotations.go @@ -35,4 +35,9 @@ const ( // in the remote cluster. This annotation requires the API server support to be "remote" for the pod and the // remote service account to be created. RemoteServiceAccountNameAnnotation = "liqo.io/remote-service-account-name" + + // LabelsTemplateAnnotationKey contains a cache to store labels keys that belongs to a template. + LabelsTemplateAnnotationKey = "liqo.io/template-labels" + // AnnotsTemplateAnnotationKey contains a cache to store annotations keys that belongs to a template. + AnnotsTemplateAnnotationKey = "liqo.io/template-annotations" ) diff --git a/pkg/consts/externalnetwork.go b/pkg/consts/externalnetwork.go new file mode 100644 index 0000000000..e9acccf525 --- /dev/null +++ b/pkg/consts/externalnetwork.go @@ -0,0 +1,26 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consts + +const ( + // WgServerNameLabel is the label used to indicate the name of the WireGuard server. + WgServerNameLabel = "liqo.io/wg-server-name" + // WgClientNameLabel is the label used to indicate the name of the WireGuard client. + WgClientNameLabel = "liqo.io/wg-client-name" + // ExternalNetworkLabel is the label added to all components that belong to the external network. + ExternalNetworkLabel = "liqo.io/external-network" + // ExternalNetworkLabelValue is the value of the label added to components that belong to the external network. + ExternalNetworkLabelValue = "true" +) diff --git a/pkg/liqo-controller-manager/external-network/wireguard/docs.go b/pkg/liqo-controller-manager/external-network/wireguard/docs.go new file mode 100644 index 0000000000..96748c9838 --- /dev/null +++ b/pkg/liqo-controller-manager/external-network/wireguard/docs.go @@ -0,0 +1,16 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package wireguard contains the logic to manage WireGuard gateway servers and clients. +package wireguard diff --git a/pkg/liqo-controller-manager/external-network/wireguard/wggatewayclient_controller.go b/pkg/liqo-controller-manager/external-network/wireguard/wggatewayclient_controller.go new file mode 100644 index 0000000000..4baeeccb52 --- /dev/null +++ b/pkg/liqo-controller-manager/external-network/wireguard/wggatewayclient_controller.go @@ -0,0 +1,213 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wireguard + +import ( + "context" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" + "github.com/liqotech/liqo/pkg/consts" + liqolabels "github.com/liqotech/liqo/pkg/utils/labels" + mapsutil "github.com/liqotech/liqo/pkg/utils/maps" +) + +// WgGatewayClientReconciler manage WgGatewayClient lifecycle. +type WgGatewayClientReconciler struct { + client.Client + Scheme *runtime.Scheme +} + +// NewWgGatewayClientReconciler returns a new WgGatewayClientReconciler. +func NewWgGatewayClientReconciler(cl client.Client, s *runtime.Scheme) *WgGatewayClientReconciler { + return &WgGatewayClientReconciler{ + Client: cl, + Scheme: s, + } +} + +// cluster-role +// +kubebuilder:rbac:groups=networking.liqo.io,resources=wggatewayclients,verbs=get;list;watch;delete;create;update;patch +// +kubebuilder:rbac:groups=networking.liqo.io,resources=wggatewayclients/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;delete;create;update;patch +// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch + +// Reconcile manage GatewayClient lifecycle. +func (r *WgGatewayClientReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { + wgClient := &networkingv1alpha1.WgGatewayClient{} + if err = r.Get(ctx, req.NamespacedName, wgClient); err != nil { + if apierrors.IsNotFound(err) { + klog.V(4).Infof("WireGuard gateway client %q not found", req.NamespacedName) + return ctrl.Result{}, nil + } + klog.Errorf("Unable to get the WireGuard gateway client %q: %v", req.NamespacedName, err) + return ctrl.Result{}, err + } + + if !wgClient.DeletionTimestamp.IsZero() { + // Resource is deleting and child resources are deleted as well by garbage collector. Nothing to do. + return ctrl.Result{}, nil + } + + // Ensure deployment (create or update) + deployNsName := types.NamespacedName{Namespace: wgClient.Namespace, Name: wgClient.Name} + _, err = r.ensureDeployment(ctx, wgClient, deployNsName) + if err != nil { + return ctrl.Result{}, err + } + + // Handle status + defer func() { + newErr := r.Status().Update(ctx, wgClient) + if newErr != nil { + if err != nil { + klog.Errorf("Error reconciling the WireGuard gateway client %q: %s", req.NamespacedName, err) + } + klog.Errorf("Unable to update the WireGuard gateway client status %q: %s", req.NamespacedName, newErr) + err = newErr + } + }() + + if err := r.handleSecretRefStatus(ctx, wgClient); err != nil { + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil +} + +// SetupWithManager register the WgGatewayClientReconciler to the manager. +func (r *WgGatewayClientReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&networkingv1alpha1.WgGatewayClient{}). + Owns(&appsv1.Deployment{}). + Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(r.secretEnquerer), builder.WithPredicates(r.filterSecretsPredicate())). + Complete(r) +} + +func (r *WgGatewayClientReconciler) filterSecretsPredicate() predicate.Predicate { + filterWgClientSecrets, err := predicate.LabelSelectorPredicate(liqolabels.WgClientNameLabelSelector) + utilruntime.Must(err) + return filterWgClientSecrets +} + +func (r *WgGatewayClientReconciler) secretEnquerer(_ context.Context, obj client.Object) []ctrl.Request { + secret, ok := obj.(*corev1.Secret) + if !ok { + return nil + } + + wgClientName, found := secret.GetLabels()[consts.WgClientNameLabel] + if !found { + return nil + } + + return []ctrl.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: secret.Namespace, + Name: wgClientName, + }, + }, + } +} + +func (r *WgGatewayClientReconciler) ensureDeployment(ctx context.Context, wgClient *networkingv1alpha1.WgGatewayClient, + depNsName types.NamespacedName) (*appsv1.Deployment, error) { + dep := appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{ + Name: depNsName.Name, + Namespace: depNsName.Namespace, + }} + + op, err := controllerutil.CreateOrUpdate(ctx, r.Client, &dep, func() error { + return r.mutateFnWgClientDeployment(&dep, wgClient) + }) + if err != nil { + klog.Errorf("error while creating/updating deployment %q (operation: %s): %v", depNsName, op, err) + return nil, err + } + + klog.Infof("Deployment %q correctly enforced (operation: %s)", depNsName, op) + return &dep, nil +} + +func (r *WgGatewayClientReconciler) mutateFnWgClientDeployment(deployment *appsv1.Deployment, wgClient *networkingv1alpha1.WgGatewayClient) error { + // Forge metadata + mapsutil.SmartMergeLabels(deployment, wgClient.Spec.Deployment.Metadata.GetLabels()) + mapsutil.SmartMergeAnnotations(deployment, wgClient.Spec.Deployment.Metadata.GetAnnotations()) + + // Forge spec + deployment.Spec = wgClient.Spec.Deployment.Spec + + // Set WireGuard client as owner of the deployment + return controllerutil.SetControllerReference(wgClient, deployment, r.Scheme) +} + +func (r *WgGatewayClientReconciler) handleSecretRefStatus(ctx context.Context, wgClient *networkingv1alpha1.WgGatewayClient) error { + secret, err := r.getWgClientKeysSecret(ctx, wgClient) + if err != nil { + return err + } + + // Put secret reference in WireGuard client status + if secret == nil { + // if the secret is not found, we cancel the reference as it could be not valid anymore + wgClient.Status.SecretRef = nil + } else { + wgClient.Status.SecretRef = &corev1.ObjectReference{ + Name: secret.Name, + Namespace: secret.Namespace, + } + } + + return nil +} + +func (r *WgGatewayClientReconciler) getWgClientKeysSecret(ctx context.Context, wgClient *networkingv1alpha1.WgGatewayClient) (*corev1.Secret, error) { + wgClientSelector := client.MatchingLabels{ + consts.WgClientNameLabel: wgClient.Name, // secret created by the WireGuard client with the given name + } + + var secrets corev1.SecretList + err := r.List(ctx, &secrets, client.InNamespace(wgClient.Namespace), wgClientSelector) + if err != nil { + klog.Errorf("Unable to list secrets associated to WireGuard client %s/%s: %v", wgClient.Namespace, wgClient.Name, err) + return nil, err + } + + switch len(secrets.Items) { + case 0: + klog.Warningf("Secret associated to WireGuard client %s/%s not found", wgClient.Namespace, wgClient.Name) + return nil, nil + case 1: + return &secrets.Items[0], nil + default: + return nil, fmt.Errorf("found multiple secrets associated to WireGuard client %s/%s", wgClient.Namespace, wgClient.Name) + } +} diff --git a/pkg/liqo-controller-manager/external-network/wireguard/wggatewayserver_controller.go b/pkg/liqo-controller-manager/external-network/wireguard/wggatewayserver_controller.go new file mode 100644 index 0000000000..4839956715 --- /dev/null +++ b/pkg/liqo-controller-manager/external-network/wireguard/wggatewayserver_controller.go @@ -0,0 +1,430 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wireguard + +import ( + "context" + "fmt" + + "golang.org/x/exp/maps" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" + "github.com/liqotech/liqo/pkg/consts" + "github.com/liqotech/liqo/pkg/discovery" + "github.com/liqotech/liqo/pkg/utils" + liqolabels "github.com/liqotech/liqo/pkg/utils/labels" + mapsutil "github.com/liqotech/liqo/pkg/utils/maps" +) + +// WgGatewayServerReconciler manage WgGatewayServer lifecycle. +type WgGatewayServerReconciler struct { + client.Client + Scheme *runtime.Scheme + extNetPodsClient client.Client +} + +// NewWgGatewayServerReconciler returns a new WgGatewayServerReconciler. +func NewWgGatewayServerReconciler(cl client.Client, s *runtime.Scheme, extNetPodsClient client.Client) *WgGatewayServerReconciler { + return &WgGatewayServerReconciler{ + Client: cl, + Scheme: s, + extNetPodsClient: extNetPodsClient, + } +} + +// +kubebuilder:rbac:groups=networking.liqo.io,resources=wggatewayservers,verbs=get;list;watch;delete;create;update;patch +// +kubebuilder:rbac:groups=networking.liqo.io,resources=wggatewayservers/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;delete;create;update;patch +// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch +// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch +// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;delete;create;update;patch +// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch + +// Reconcile manage GatewayServer lifecycle. +func (r *WgGatewayServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { + wgServer := &networkingv1alpha1.WgGatewayServer{} + if err = r.Get(ctx, req.NamespacedName, wgServer); err != nil { + if apierrors.IsNotFound(err) { + klog.V(4).Infof("WireGuard gateway server %q not found", req.NamespacedName) + return ctrl.Result{}, nil + } + klog.Errorf("Unable to get the WireGuard gateway server %q: %v", req.NamespacedName, err) + return ctrl.Result{}, err + } + + if !wgServer.DeletionTimestamp.IsZero() { + // Resource is deleting and child resources are deleted as well by garbage collector. Nothing to do. + return ctrl.Result{}, nil + } + + // Ensure deployment (create or update) + deployNsName := types.NamespacedName{Namespace: wgServer.Namespace, Name: wgServer.Name} + deploy, err := r.ensureDeployment(ctx, wgServer, deployNsName) + if err != nil { + return ctrl.Result{}, err + } + + // Ensure service (create or update) + svcNsName := types.NamespacedName{Namespace: wgServer.Namespace, Name: wgServer.Name} + _, err = r.ensureService(ctx, wgServer, svcNsName) + if err != nil { + return ctrl.Result{}, err + } + + // Handle status + defer func() { + newErr := r.Status().Update(ctx, wgServer) + if newErr != nil { + if err != nil { + klog.Errorf("Error reconciling the WireGuard gateway server %q: %s", req.NamespacedName, err) + } + klog.Errorf("Unable to update the WireGuard gateway server status %q: %s", req.NamespacedName, newErr) + err = newErr + } + }() + + if err := r.handleEndpointStatus(ctx, wgServer, svcNsName, deploy); err != nil { + return ctrl.Result{}, err + } + + if err := r.handleSecretRefStatus(ctx, wgServer); err != nil { + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil +} + +// SetupWithManager register the WgGatewayServerReconciler to the manager. +func (r *WgGatewayServerReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&networkingv1alpha1.WgGatewayServer{}). + Owns(&appsv1.Deployment{}). + Owns(&corev1.Service{}). + Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(r.secretEnquerer), builder.WithPredicates(r.filterSecretsPredicate())). + Complete(r) +} + +func (r *WgGatewayServerReconciler) filterSecretsPredicate() predicate.Predicate { + filterWgServerSecrets, err := predicate.LabelSelectorPredicate(liqolabels.WgServerNameLabelSelector) + utilruntime.Must(err) + return filterWgServerSecrets +} + +func (r *WgGatewayServerReconciler) secretEnquerer(_ context.Context, obj client.Object) []ctrl.Request { + secret, ok := obj.(*corev1.Secret) + if !ok { + return nil + } + + wgServerName, found := secret.GetLabels()[consts.WgServerNameLabel] + if !found { + return nil + } + + return []ctrl.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: secret.Namespace, + Name: wgServerName, + }, + }, + } +} + +func (r *WgGatewayServerReconciler) ensureDeployment(ctx context.Context, wgServer *networkingv1alpha1.WgGatewayServer, + depNsName types.NamespacedName) (*appsv1.Deployment, error) { + dep := appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{ + Name: depNsName.Name, + Namespace: depNsName.Namespace, + }} + + op, err := controllerutil.CreateOrUpdate(ctx, r.Client, &dep, func() error { + return r.mutateFnWgServerDeployment(&dep, wgServer) + }) + if err != nil { + klog.Errorf("error while creating/updating deployment %q (operation: %s): %v", depNsName, op, err) + return nil, err + } + + klog.Infof("Deployment %q correctly enforced (operation: %s)", depNsName, op) + return &dep, nil +} + +func (r *WgGatewayServerReconciler) ensureService(ctx context.Context, wgServer *networkingv1alpha1.WgGatewayServer, + svcNsName types.NamespacedName) (*corev1.Service, error) { + svc := corev1.Service{ObjectMeta: metav1.ObjectMeta{ + Name: svcNsName.Name, + Namespace: svcNsName.Namespace, + }} + + op, err := controllerutil.CreateOrUpdate(ctx, r.Client, &svc, func() error { + return r.mutateFnWgServerService(&svc, wgServer) + }) + if err != nil { + klog.Errorf("error while creating/updating service %q (operation: %s): %v", svcNsName, op, err) + return nil, err + } + + klog.Infof("Service %q correctly enforced (operation: %s)", svcNsName, op) + return &svc, nil +} + +func (r *WgGatewayServerReconciler) mutateFnWgServerDeployment(deployment *appsv1.Deployment, wgServer *networkingv1alpha1.WgGatewayServer) error { + // Forge metadata + mapsutil.SmartMergeLabels(deployment, wgServer.Spec.Deployment.Metadata.GetLabels()) + mapsutil.SmartMergeAnnotations(deployment, wgServer.Spec.Deployment.Metadata.GetAnnotations()) + + // Forge spec + deployment.Spec = wgServer.Spec.Deployment.Spec + + // Set WireGuard server as owner of the deployment + return controllerutil.SetControllerReference(wgServer, deployment, r.Scheme) +} + +func (r *WgGatewayServerReconciler) mutateFnWgServerService(service *corev1.Service, wgServer *networkingv1alpha1.WgGatewayServer) error { + // Forge metadata + mapsutil.SmartMergeLabels(service, wgServer.Spec.Service.Metadata.GetLabels()) + mapsutil.SmartMergeAnnotations(service, wgServer.Spec.Service.Metadata.GetAnnotations()) + + // Forge spec + service.Spec = wgServer.Spec.Service.Spec + + // Set WireGuard server as owner of the service + return controllerutil.SetControllerReference(wgServer, service, r.Scheme) +} + +func (r *WgGatewayServerReconciler) handleEndpointStatus(ctx context.Context, wgServer *networkingv1alpha1.WgGatewayServer, + svcNsName types.NamespacedName, dep *appsv1.Deployment) error { + // Handle WireGuard server Service + var service corev1.Service + err := r.Get(ctx, svcNsName, &service) + if err != nil { + klog.Error(err) // raise an error also if service NotFound + return err + } + + // Put service endpoint in WireGuard server status + var endpointStatus *networkingv1alpha1.EndpointStatus + switch service.Spec.Type { + case corev1.ServiceTypeNodePort: + endpointStatus, err = r.forgeEndpointStatusNodePort(ctx, &service, dep) + case corev1.ServiceTypeLoadBalancer: + endpointStatus, err = r.forgeEndpointStatusLoadBalancer(&service) + default: + err = fmt.Errorf("service type %q not supported for WireGuard server Service %q", service.Spec.Type, svcNsName) + klog.Error(err) + wgServer.Status.Endpoint = nil // we empty the endpoint status to avoid misaligned spec and status + } + + if err != nil { + return err + } + + wgServer.Status.Endpoint = endpointStatus + + return nil +} + +func (r *WgGatewayServerReconciler) forgeEndpointStatusNodePort(ctx context.Context, service *corev1.Service, + dep *appsv1.Deployment) (*networkingv1alpha1.EndpointStatus, error) { + if len(service.Spec.Ports) == 0 { + err := fmt.Errorf("service %s/%s has no ports", service.Namespace, service.Name) + klog.Error(err) + return nil, err + } + + port := service.Spec.Ports[0].NodePort + protocol := &service.Spec.Ports[0].Protocol + + // Every node IP is a valid endpoint. For convenience, we get the IP of all nodes hosting replicas of the deployment + // (i.e., WireGuard gateway servers). + var addresses []string + podsFromDepSelector := client.MatchingLabelsSelector{Selector: labels.SelectorFromSet(dep.Spec.Selector.MatchLabels)} + var podList corev1.PodList + if err := r.extNetPodsClient.List(ctx, &podList, client.InNamespace(dep.Namespace), podsFromDepSelector); err != nil { + klog.Errorf("Unable to list pods of deployment %s/%s: %v", dep.Namespace, dep.Name, err) + return nil, err + } + + // Check if the number of pods found (i.e., in cache) matches the number of desired replicas. + if err := r.numPodsMatchesDesiredReplicas(len(podList.Items), dep); err != nil { + return nil, err + } + + switch len(podList.Items) { + case 0: + err := fmt.Errorf("pods of deployment %s/%s not found", dep.Namespace, dep.Name) + klog.Error(err) + return nil, err + default: + // TODO: if using active-passive, it should get the IP of the active node + // Get all nodes hosting pod replicas of the WireGuard server deployment + var nodes []*corev1.Node + for i := range podList.Items { + pod := &podList.Items[i] + // get node hosting pod + var node corev1.Node + err := r.Get(ctx, types.NamespacedName{Name: pod.Spec.NodeName}, &node) + if err != nil && !apierrors.IsNotFound(err) { + klog.Errorf("Unable to get node %q: %v", pod.Spec.NodeName, err) + return nil, err + } + if !apierrors.IsNotFound(err) { + nodes = append(nodes, &node) + } + } + // For every node, get IP address. We avoid duplicate utilizing a map and then converting to array. + // Note that duplicates should not happen if the deployment correctly have replicas spread across different nodes, + // but we double check anyway. + addressesMap := make(map[string]interface{}) + for i := range nodes { + if utils.IsNodeReady(nodes[i]) { + address, err := discovery.GetAddress(nodes[i]) + if err == nil { + addressesMap[address] = nil + } + } + } + // If addressesMap is empty, it could be due to temporary not ready nodes. + // In this case, we choose a random one (e.g., the first) + if len(addressesMap) == 0 { + if len(nodes) > 0 { + address, err := discovery.GetAddress(nodes[0]) + if err == nil { + addressesMap[address] = nil + } + } + } + // If addressesMap is still empty, we raise an error + if len(addressesMap) == 0 { + err := fmt.Errorf("no valid addresses found for WireGuard server %s/%s", service.Namespace, service.Name) + klog.Error(err) + return nil, err + } + // Addresses contains only the keys to avoid duplicates + addresses = maps.Keys(addressesMap) + } + + return &networkingv1alpha1.EndpointStatus{ + Protocol: protocol, + Port: port, + Addresses: addresses, + }, nil +} + +func (r *WgGatewayServerReconciler) numPodsMatchesDesiredReplicas(numPods int, dep *appsv1.Deployment) error { + var desiredReplicas int + if dep.Spec.Replicas != nil { + desiredReplicas = int(*dep.Spec.Replicas) + } else { + desiredReplicas = 1 // default value if field is nil, as specified in the official API + } + + if numPods != desiredReplicas { + // The number of pods listed does not match the desired replicas, possibly due to a cache sync error. + // We raise an error to force requeue. + err := fmt.Errorf("pods found for deployment %s/%s (%d) does not match desired replicas (%d), possible cache sync error", + dep.Namespace, dep.Name, numPods, desiredReplicas) + klog.Warning(err) + return err + } + + return nil +} + +func (r *WgGatewayServerReconciler) forgeEndpointStatusLoadBalancer(service *corev1.Service) (*networkingv1alpha1.EndpointStatus, error) { + if len(service.Spec.Ports) == 0 { + err := fmt.Errorf("service %s/%s has no ports", service.Namespace, service.Name) + klog.Error(err) + return nil, err + } + + port := service.Spec.Ports[0].Port + protocol := &service.Spec.Ports[0].Protocol + + var addresses []string + for i := range service.Status.LoadBalancer.Ingress { + if hostName := service.Status.LoadBalancer.Ingress[i].Hostname; hostName != "" { + addresses = append(addresses, hostName) + } + if ip := service.Status.LoadBalancer.Ingress[i].IP; ip != "" { + addresses = append(addresses, ip) + } + } + + return &networkingv1alpha1.EndpointStatus{ + Protocol: protocol, + Port: port, + Addresses: addresses, + }, nil +} + +func (r *WgGatewayServerReconciler) handleSecretRefStatus(ctx context.Context, wgServer *networkingv1alpha1.WgGatewayServer) error { + secret, err := r.getWgServerKeysSecret(ctx, wgServer) + if err != nil { + return err + } + + // Put secret reference in WireGuard server status + if secret == nil { + // if the secret is not found, we cancel the reference as it could be not valid anymore + wgServer.Status.SecretRef = nil + } else { + wgServer.Status.SecretRef = &corev1.ObjectReference{ + Name: secret.Name, + Namespace: secret.Namespace, + } + } + + return nil +} + +func (r *WgGatewayServerReconciler) getWgServerKeysSecret(ctx context.Context, wgServer *networkingv1alpha1.WgGatewayServer) (*corev1.Secret, error) { + wgServerSelector := client.MatchingLabels{ + consts.WgServerNameLabel: wgServer.Name, // secret created by the WireGuard server with the given name + } + + var secrets corev1.SecretList + err := r.List(ctx, &secrets, client.InNamespace(wgServer.Namespace), wgServerSelector) + if err != nil { + klog.Errorf("Unable to list secrets associated to WireGuard server %s/%s: %v", wgServer.Namespace, wgServer.Name, err) + return nil, err + } + + switch len(secrets.Items) { + case 0: + klog.Warningf("Secret associated to WireGuard server %s/%s not found", wgServer.Namespace, wgServer.Name) + return nil, nil + case 1: + return &secrets.Items[0], nil + default: + return nil, fmt.Errorf("found multiple secrets associated to WireGuard server %s/%s", wgServer.Namespace, wgServer.Name) + } +} diff --git a/pkg/utils/labels/labelSelectors.go b/pkg/utils/labels/labelSelectors.go index e7d3c75a0c..9da62a5800 100644 --- a/pkg/utils/labels/labelSelectors.go +++ b/pkg/utils/labels/labelSelectors.go @@ -104,6 +104,26 @@ var ( }, }, } + + // WgServerNameLabelSelector selector used to get a secret for a WireGuard gateway server. + WgServerNameLabelSelector = metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: liqoconst.WgServerNameLabel, + Operator: metav1.LabelSelectorOpExists, + }, + }, + } + + // WgClientNameLabelSelector selector used to get a secret for a WireGuard gateway client. + WgClientNameLabelSelector = metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: liqoconst.WgClientNameLabel, + Operator: metav1.LabelSelectorOpExists, + }, + }, + } ) // LocalLabelSelector returns a label selector to match local resources. diff --git a/pkg/utils/maps/maps.go b/pkg/utils/maps/maps.go index 2a133811ff..63056f0553 100644 --- a/pkg/utils/maps/maps.go +++ b/pkg/utils/maps/maps.go @@ -14,6 +14,17 @@ package maps +import ( + "fmt" + "maps" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + + "github.com/liqotech/liqo/pkg/consts" +) + // Merge merges two maps. func Merge[K comparable, V any](m1, m2 map[K]V) map[K]V { if m1 == nil { @@ -67,3 +78,113 @@ func FilterBlacklist[K comparable](blacklist ...K) FilterType[K] { return !FilterWhitelist(blacklist...)(check) } } + +// SmartMergeLabels merges labels from a template map in a map, and remember what labels were added in +// the object from the template, storing them in a custom annotation. This allows the function to also +// delete the labels that were added by the template previously, but that they are no longer present +// in the template. This is useful to avoid to accumulate labels in the object that are not present +// in the template anymore. +func SmartMergeLabels(obj metav1.Object, templateLabels map[string]string) { + if templateLabels == nil { + templateLabels = make(map[string]string) + } + + // Filter out the labels not present in the template but present in the cached template labels + filteredLabels := FilteredDeletedLabels(obj.GetAnnotations(), obj.GetLabels(), templateLabels) + + // Merge with current template labels + obj.SetLabels(labels.Merge(filteredLabels, templateLabels)) + + // Update cache with latest template labels + obj.SetAnnotations(UpdateCache(obj.GetAnnotations(), templateLabels, consts.LabelsTemplateAnnotationKey)) +} + +// SmartMergeAnnotations merges annotations from a template map in a map, and remember what annotations were added in +// the object from the template, storing them in a custom annotation. This allows the function to also +// delete the annotations that were added by the template previously, but that they are no longer present +// in the template. This is useful to avoid to accumulate annotations in the object that are not present +// in the template anymore. +func SmartMergeAnnotations(obj metav1.Object, templateAnnots map[string]string) { + if templateAnnots == nil { + templateAnnots = make(map[string]string) + } + + // Filter out the annotations not present in the template but present in the cached template annotations + filteredAnnots := FilteredDeletedAnnotations(obj.GetAnnotations(), templateAnnots) + + // Merge with current template annotations + obj.SetAnnotations(labels.Merge(filteredAnnots, templateAnnots)) + + // Update cache with latest template annotations + obj.SetAnnotations(UpdateCache(obj.GetAnnotations(), templateAnnots, consts.AnnotsTemplateAnnotationKey)) +} + +// FilteredDeletedLabels returns the labels of the object after deleting the labels not present +// in the template but present in the cached template labels (e.g., the ones added by the template). +func FilteredDeletedLabels(annots, labs, templateLabs map[string]string) map[string]string { + // Get cached labels of the template + var cachedTemplateLabelsKey []string + if annots != nil { + cache := annots[consts.LabelsTemplateAnnotationKey] + cachedTemplateLabelsKey = DeSerializeCache(cache) + } + + // Delete from objet labels the entries not present in the template labels + return FilteredDeletedEntries(labs, templateLabs, cachedTemplateLabelsKey) +} + +// FilteredDeletedAnnotations returns the annotations of the object after deleting the annotations not present +// in the template but present in the cached template annotations (e.g., the ones added by the template). +func FilteredDeletedAnnotations(annots, templateAnnots map[string]string) map[string]string { + // Get cached annotations of the template + var cachedTemplateAnnotsKeys []string + if annots != nil { + cache := annots[consts.AnnotsTemplateAnnotationKey] + cachedTemplateAnnotsKeys = DeSerializeCache(cache) + } + + // Delete from objet annotations the entries not present in the template annotations + return FilteredDeletedEntries(annots, templateAnnots, cachedTemplateAnnotsKeys) +} + +// FilteredDeletedEntries deletes entries of map m1 that are not present in map m2, +// excluding the ones not stored in a cache of keys. +func FilteredDeletedEntries(m1, m2 map[string]string, cache []string) map[string]string { + res := maps.Clone(m1) + if res == nil { + res = make(map[string]string) + } + if m2 == nil { + m2 = make(map[string]string) + } + for _, key := range cache { + if _, exists := m2[key]; !exists { + delete(res, key) + } + } + return res +} + +// UpdateCache updates the cache with the latest entries from template. +func UpdateCache(annots, template map[string]string, cacheKey string) map[string]string { + if annots == nil { + annots = make(map[string]string) + } + // Update cache with latest template labels + annots[cacheKey] = SerializeMap(template) + return annots +} + +// SerializeMap convert a map in a string of concatenated keys seprated by commas. +func SerializeMap(m map[string]string) string { + serialized := "" + for k := range m { + serialized += fmt.Sprintf("%s,", k) + } + return serialized +} + +// DeSerializeCache splits a serialized map. +func DeSerializeCache(s string) []string { + return strings.Split(s, ",") +}