Skip to content

Commit

Permalink
gateway server and client controllers
Browse files Browse the repository at this point in the history
  • Loading branch information
aleoli committed Sep 22, 2023
1 parent 6336421 commit 450acf3
Show file tree
Hide file tree
Showing 14 changed files with 924 additions and 0 deletions.
37 changes: 37 additions & 0 deletions cmd/liqo-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
Expand All @@ -49,12 +51,16 @@ import (

discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1"
networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1"
offloadingv1alpha1 "github.com/liqotech/liqo/apis/offloading/v1alpha1"
sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1"
virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
"github.com/liqotech/liqo/cmd/virtual-kubelet/root"
"github.com/liqotech/liqo/pkg/consts"
identitymanager "github.com/liqotech/liqo/pkg/identityManager"
clientoperator "github.com/liqotech/liqo/pkg/liqo-controller-manager/external-network/client-operator"
serveroperator "github.com/liqotech/liqo/pkg/liqo-controller-manager/external-network/server-operator"
enutils "github.com/liqotech/liqo/pkg/liqo-controller-manager/external-network/utils"
foreignclusteroperator "github.com/liqotech/liqo/pkg/liqo-controller-manager/foreign-cluster-operator"
mapsctrl "github.com/liqotech/liqo/pkg/liqo-controller-manager/namespacemap-controller"
nsoffctrl "github.com/liqotech/liqo/pkg/liqo-controller-manager/namespaceoffloading-controller"
Expand Down Expand Up @@ -97,6 +103,7 @@ func init() {
_ = discoveryv1alpha1.AddToScheme(scheme)
_ = offloadingv1alpha1.AddToScheme(scheme)
_ = virtualkubeletv1alpha1.AddToScheme(scheme)
_ = networkingv1alpha1.AddToScheme(scheme)
}

func main() {
Expand All @@ -110,6 +117,8 @@ func main() {
var kubeletMetricsEnabled bool
var labelsNotReflected argsutils.StringList
var annotationsNotReflected argsutils.StringList
var gatewayServerResources argsutils.StringList
var gatewayClientResources argsutils.StringList

webhookPort := flag.Uint("webhook-port", 9443, "The port the webhook server binds to")
metricsAddr := flag.String("metrics-address", ":8080", "The address the metric endpoint binds to")
Expand Down Expand Up @@ -188,6 +197,10 @@ func main() {
// Node failure controller parameter
enableNodeFailureController := flag.Bool("enable-node-failure-controller", false, "Enable the node failure controller")

// External network parameters
flag.Var(&gatewayServerResources, "gateway-server-resources", "The list of resource types that implements the gateway server. They must be in the form <group>/<version>/<resource>")
flag.Var(&gatewayClientResources, "gateway-client-resources", "The list of resource types that implements the gateway client. They must be in the form <group>/<version>/<resource>")

liqoerrors.InitFlags(nil)
restcfg.InitFlags(nil)
klog.InitFlags(nil)
Expand Down Expand Up @@ -222,6 +235,11 @@ func main() {

config := restcfg.SetRateLimiter(ctrl.GetConfigOrDie())

dynClient := dynamic.NewForConfigOrDie(config)
factory := &enutils.RunnableFactory{
DynamicSharedInformerFactory: dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, corev1.NamespaceAll, nil),
}

// Create a label selector to filter only the events for pods managed by a ShadowPod (i.e., remote offloaded pods),
// as those are the only ones we are interested in to implement the resiliency mechanism.
reqRemoteLiqoPods, err := labels.NewRequirement(consts.ManagedByLabelKey, selection.Equals, []string{consts.ManagedByShadowPodValue})
Expand Down Expand Up @@ -254,6 +272,11 @@ func main() {
os.Exit(1)
}

if err = mgr.Add(factory); err != nil {
klog.Error(err)
os.Exit(1)
}

// Create a label selector to filter only the events for local offloaded pods
reqLocalLiqoPods, err := labels.NewRequirement(consts.LocalPodLabelKey, selection.Equals, []string{consts.LocalPodLabelValue})
utilruntime.Must(err)
Expand Down Expand Up @@ -478,6 +501,20 @@ func main() {
klog.Fatal(err)
}

serverReconciler := serveroperator.NewServerReconciler(ctx, mgr.GetClient(),
dynClient, factory, mgr.GetScheme(), gatewayServerResources.StringList)
if err := serverReconciler.SetupWithManager(mgr); err != nil {
klog.Error(err)
os.Exit(1)
}

clientReconciler := clientoperator.NewClientReconciler(ctx, mgr.GetClient(),
dynClient, factory, mgr.GetScheme(), gatewayClientResources.StringList)
if err := clientReconciler.SetupWithManager(mgr); err != nil {
klog.Error(err)
os.Exit(1)
}

// Start the handler to approve the virtual kubelet certificate signing requests.
csrWatcher := csr.NewWatcher(clientset, *resyncPeriod, labels.Everything(), fields.Everything())
csrWatcher.RegisterHandler(csr.ApproverHandler(clientset, "LiqoApproval", "This CSR was approved by Liqo",
Expand Down
24 changes: 24 additions & 0 deletions deployments/liqo/files/liqo-controller-manager-ClusterRole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,30 @@ rules:
- get
- update
- watch
- apiGroups:
- networking.liqo.io
resources:
- gatewayclients
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- networking.liqo.io
resources:
- gatewayservers
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- offloading.liqo.io
resources:
Expand Down
11 changes: 11 additions & 0 deletions deployments/liqo/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@ Concatenates a values list into a string in the form "--commandName=val1,val2"
- {{ trimSuffix "," $res }}
{{- end -}}

{{/*
Concatenates a values list of groupVersionResources into a string in the form "--commandName=group1/version1/resource1,group2/version2/resource2"
*/}}
{{- define "liqo.concatenateGroupVersionResources" -}}
{{- $res := print .commandName "=" -}}
{{- range $val := .list -}}
{{- $res = print $res $val.apiVersion "/" $val.resource "," -}}
{{- end -}}
- {{ trimSuffix "," $res }}
{{- end -}}

{{/*
Get the liqo clusterID ConfigMap name
*/}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ spec:
- --configmap-reflection-type={{ .Values.reflection.configmap.type }}
- --secret-reflection-type={{ .Values.reflection.secret.type }}
- --event-reflection-type={{ .Values.reflection.event.type }}
{{- $d := dict "commandName" "--gateway-server-resources" "list" .Values.networking.serverResources }}
{{- include "liqo.concatenateGroupVersionResources" $d | nindent 10 }}
{{- $d := dict "commandName" "--gateway-client-resources" "list" .Values.networking.clientResources }}
{{- include "liqo.concatenateGroupVersionResources" $d | nindent 10 }}
{{- if .Values.reflection.skip.labels }}
{{- $d := dict "commandName" "--labels-not-reflected" "list" .Values.reflection.skip.labels }}
{{- include "liqo.concatenateList" $d | nindent 10 }}
Expand Down
8 changes: 8 additions & 0 deletions deployments/liqo/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ networking:
# The default value is configured to ensure correct behavior regardless of the combination of the underlying environments
# (e.g., cloud providers). This guarantees improved compatibility at the cost of possible limited performance drops.
mtu: 1340
# -- Set the list of resources that implement the GatewayServer
serverResources:
- apiVersion: networking.liqo.io/v1alpha1
resource: wggatewayservers
# -- Set the list of resources that implement the GatewayClient
clientResources:
- apiVersion: networking.liqo.io/v1alpha1
resource: wggatewayclients

reflection:
skip:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2019-2023 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clientoperator

import (
"context"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1"
enutils "github.com/liqotech/liqo/pkg/liqo-controller-manager/external-network/utils"
)

// ClientReconciler manage GatewayClient lifecycle.
type ClientReconciler struct {
client.Client
Scheme *runtime.Scheme
DynClient dynamic.Interface
Factory *enutils.RunnableFactory
ClientResources []string
}

// NewClientReconciler returns a new ClientReconciler.
func NewClientReconciler(ctx context.Context,
cl client.Client, dynClient dynamic.Interface,
factory *enutils.RunnableFactory, s *runtime.Scheme,
clientResources []string) *ClientReconciler {
return &ClientReconciler{
Client: cl,
Scheme: s,
DynClient: dynClient,
Factory: factory,
ClientResources: clientResources,
}
}

// cluster-role
// +kubebuilder:rbac:groups=networking.liqo.io,resources=gatewayclients,verbs=get;list;watch;delete;create;update;patch

// Reconcile manage GatewayClient lifecycle.
func (r *ClientReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
gwClient := &networkingv1alpha1.GatewayClient{}
if err = r.Get(ctx, req.NamespacedName, gwClient); err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("Gateway client %q not found", req.NamespacedName)
return ctrl.Result{}, nil
}
klog.Errorf("Unable to get the gateway client %q: %s", req.NamespacedName, err)
return ctrl.Result{}, err
}

defer func() {
newErr := r.Status().Update(ctx, gwClient)
if newErr != nil {
if err != nil {
klog.Errorf("Error reconciling the gateway client %q: %s", req.NamespacedName, err)
}
klog.Errorf("Unable to update the gateway client %q: %s", req.NamespacedName, newErr)
err = newErr
}
}()

if err = r.EnsureGatewayClient(ctx, gwClient); err != nil {
klog.Errorf("Unable to ensure the gateway client %q: %s", req.NamespacedName, err)
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

// EnsureGatewayClient ensures the GatewayClient is correctly configured.
func (r *ClientReconciler) EnsureGatewayClient(ctx context.Context, gwClient *networkingv1alpha1.GatewayClient) error {
templateGV, err := schema.ParseGroupVersion(gwClient.Spec.ClientTemplateRef.APIVersion)
if err != nil {
return fmt.Errorf("unable to parse the client template group version: %w", err)
}

templateGVR := schema.GroupVersionResource{
Group: templateGV.Group,
Version: templateGV.Version,
Resource: enutils.KindToResource(gwClient.Spec.ClientTemplateRef.Kind),
}
template, err := r.DynClient.Resource(templateGVR).
Namespace(gwClient.Spec.ClientTemplateRef.Namespace).
Get(ctx, gwClient.Spec.ClientTemplateRef.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("unable to get the client template: %w", err)
}

templateSpec, ok := template.Object["spec"].(map[string]interface{})
if !ok {
return fmt.Errorf("unable to get the spec of the client template")
}
objectKindInt, ok := templateSpec["objectKind"].(map[string]interface{})
if !ok {
return fmt.Errorf("unable to get the object kind of the client template")
}
objectKind := metav1.TypeMeta{
Kind: objectKindInt["kind"].(string),
APIVersion: objectKindInt["apiVersion"].(string),
}
objectTemplate, ok := templateSpec["template"].(map[string]interface{})
if !ok {
return fmt.Errorf("unable to get the template of the client template")
}
objectTemplateMetadataInt, ok := objectTemplate["metadata"].(map[string]interface{})
if !ok {
return fmt.Errorf("unable to get the metadata of the client template")
}
objectTemplateMetadata := metav1.ObjectMeta{
Name: enutils.GetValueOrDefault(objectTemplateMetadataInt, "name", gwClient.Name),
Namespace: enutils.GetValueOrDefault(objectTemplateMetadataInt, "namespace", gwClient.Namespace),
Labels: enutils.TranslateMap(objectTemplateMetadataInt["labels"]),
Annotations: enutils.TranslateMap(objectTemplateMetadataInt["annotations"]),
}
objectTemplateSpec, ok := objectTemplate["spec"].(map[string]interface{})
if !ok {
return fmt.Errorf("unable to get the spec of the client template")
}

unstructuredObject, err := enutils.CreateOrPatch(ctx, r.DynClient.Resource(objectKind.GroupVersionKind().GroupVersion().WithResource(enutils.KindToResource(objectKind.Kind))).
Namespace(gwClient.Namespace), gwClient.Name, func(obj *unstructured.Unstructured) error {
obj.SetGroupVersionKind(objectKind.GroupVersionKind())
obj.SetName(gwClient.Name)
obj.SetNamespace(gwClient.Namespace)
obj.SetLabels(objectTemplateMetadata.Labels)
obj.SetAnnotations(objectTemplateMetadata.Annotations)
obj.SetOwnerReferences([]metav1.OwnerReference{
{
APIVersion: gwClient.APIVersion,
Kind: gwClient.Kind,
Name: gwClient.Name,
UID: gwClient.UID,
Controller: pointer.Bool(true),
},
})
spec, err := enutils.RenderTemplate(objectTemplateSpec, gwClient.Spec)
if err != nil {
return fmt.Errorf("unable to render the template: %w", err)
}
obj.Object["spec"] = spec
return nil
})
if err != nil {
return fmt.Errorf("unable to update the client: %w", err)
}

gwClient.Status.ClientRef = corev1.ObjectReference{
APIVersion: unstructuredObject.GetAPIVersion(),
Kind: unstructuredObject.GetKind(),
Name: unstructuredObject.GetName(),
Namespace: unstructuredObject.GetNamespace(),
UID: unstructuredObject.GetUID(),
}

return nil
}

// SetupWithManager register the ClientReconciler to the manager.
func (r *ClientReconciler) SetupWithManager(mgr ctrl.Manager) error {
ownerEnqueuer := enutils.NewOwnerEnqueuer(networkingv1alpha1.GatewayClientKind)
factorySource := enutils.NewFactorySource(r.Factory)

for _, resource := range r.ClientResources {
tmp := strings.Split(resource, "/")
if len(tmp) != 3 {
return fmt.Errorf("invalid resource %q", resource)
}
gvr := schema.GroupVersionResource{
Group: tmp[0],
Version: tmp[1],
Resource: tmp[2],
}
factorySource.ForResource(gvr)
}

return ctrl.NewControllerManagedBy(mgr).
WatchesRawSource(factorySource.Source(), ownerEnqueuer).
For(&networkingv1alpha1.GatewayClient{}).
Complete(r)
}
Loading

0 comments on commit 450acf3

Please sign in to comment.