diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index d48a778246..c11f6b2139 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -91,6 +91,7 @@ jobs: - telemetry - proxy - gateway/tunnel/wireguard + - gateway/gateway steps: - name: Set up QEMU diff --git a/apis/networking/v1alpha1/connection_types.go b/apis/networking/v1alpha1/connection_types.go index 3b9d5cdbc1..27ed0ed7d6 100644 --- a/apis/networking/v1alpha1/connection_types.go +++ b/apis/networking/v1alpha1/connection_types.go @@ -38,21 +38,22 @@ var ConnectionGroupVersionResource = GroupVersion.WithResource(ConnectionResourc // ConnectionType represents the type of a connection. type ConnectionType string +// ConnectionStatusValue represents the status of a connection. +type ConnectionStatusValue string + const ( // ConnectionTypeServer represents a server connection. ConnectionTypeServer ConnectionType = "Server" // ConnectionTypeClient represents a client connection. ConnectionTypeClient ConnectionType = "Client" -) -// PingSpec defines the desired state of Ping. -type PingSpec struct { - // Enabled specifies whether the ping is enabled or not. - // +kubebuilder:default=true - Enabled *bool `json:"enabled,omitempty"` - // Endpoint specifies the endpoint to ping. - Endpoint EndpointStatus `json:"endpoint,omitempty"` -} + // Connected used when the connection is up and running. + Connected ConnectionStatusValue = "Connected" + // Connecting used as temporary status while waiting for the vpn tunnel to come up. + Connecting ConnectionStatusValue = "Connecting" + // ConnectionError used to se the status in case of errors. + ConnectionError ConnectionStatusValue = "Error" +) // ConnectionSpec defines the desired state of Connection. type ConnectionSpec struct { @@ -61,62 +62,31 @@ type ConnectionSpec struct { Type ConnectionType `json:"type"` // GatewayRef specifies the reference to the gateway. GatewayRef corev1.ObjectReference `json:"gatewayRef"` - // Ping specifies the ping configuration. - Ping PingSpec `json:"ping,omitempty"` } -// ConnectionConditionType represents different conditions that a connection could assume. -type ConnectionConditionType string - -const ( - // ConnectionConditionEstablished represents a connection that is established. - ConnectionConditionEstablished ConnectionConditionType = "Established" - // ConnectionConditionPending represents a connection that is pending. - ConnectionConditionPending ConnectionConditionType = "Pending" - // ConnectionConditionDenied represents a connection that is denied. - ConnectionConditionDenied ConnectionConditionType = "Denied" - // ConnectionConditionError represents a connection that is in error. - ConnectionConditionError ConnectionConditionType = "Error" -) - -// ConnectionConditionStatusType represents the status of a connection condition. -type ConnectionConditionStatusType string - -const ( - // ConnectionConditionStatusTrue represents a connection condition that is true. - ConnectionConditionStatusTrue ConnectionConditionStatusType = "True" - // ConnectionConditionStatusFalse represents a connection condition that is false. - ConnectionConditionStatusFalse ConnectionConditionStatusType = "False" - // ConnectionConditionStatusUnknown represents a connection condition that is unknown. - ConnectionConditionStatusUnknown ConnectionConditionStatusType = "Unknown" -) - -// ConnectionCondition contains details about state of the connection. -type ConnectionCondition struct { - // Type of the connection condition. - // +kubebuilder:validation:Enum="Established" - Type ConnectionConditionType `json:"type"` - // Status of the condition. - // +kubebuilder:validation:Enum="True";"False";"Unknown" - // +kubebuilder:default="Unknown" - Status ConnectionConditionStatusType `json:"status"` - // LastTransitionTime -> timestamp for when the condition last transitioned from one status to another. - LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` - // Reason -> Machine-readable, UpperCamelCase text indicating the reason for the condition's last transition. - Reason string `json:"reason,omitempty"` - // Message -> Human-readable message indicating details about the last status transition. - Message string `json:"message,omitempty"` +// ConnectionLatency represents the latency between two clusters. +type ConnectionLatency struct { + // Value of the latency. + Value string `json:"value,omitempty"` + // Timestamp of the latency. + Timestamp metav1.Time `json:"timestamp,omitempty"` } // ConnectionStatus defines the observed state of Connection. type ConnectionStatus struct { - // Conditions contains the conditions of the connection. - Conditions []ConnectionCondition `json:"conditions,omitempty"` + // Value of the connection. + Value ConnectionStatusValue `json:"value,omitempty"` + // Latency of the connection. + Latency ConnectionLatency `json:"latency,omitempty"` } // +kubebuilder:object:root=true // +kubebuilder:resource:categories=liqo // +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="Type",type=string,JSONPath=`.spec.type` +// +kubebuilder:printcolumn:name="Latency",type=string,JSONPath=`.status.latency.value`,priority=1 +// +kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.value` +// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` // Connection contains the status of a connection between two clusters (a client and a server). type Connection struct { diff --git a/apis/networking/v1alpha1/zz_generated.deepcopy.go b/apis/networking/v1alpha1/zz_generated.deepcopy.go index 09f8849405..e298fd3eea 100644 --- a/apis/networking/v1alpha1/zz_generated.deepcopy.go +++ b/apis/networking/v1alpha1/zz_generated.deepcopy.go @@ -180,7 +180,7 @@ func (in *Connection) DeepCopyInto(out *Connection) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - in.Spec.DeepCopyInto(&out.Spec) + out.Spec = in.Spec in.Status.DeepCopyInto(&out.Status) } @@ -203,17 +203,17 @@ func (in *Connection) DeepCopyObject() runtime.Object { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ConnectionCondition) DeepCopyInto(out *ConnectionCondition) { +func (in *ConnectionLatency) DeepCopyInto(out *ConnectionLatency) { *out = *in - in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) + in.Timestamp.DeepCopyInto(&out.Timestamp) } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionCondition. -func (in *ConnectionCondition) DeepCopy() *ConnectionCondition { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionLatency. +func (in *ConnectionLatency) DeepCopy() *ConnectionLatency { if in == nil { return nil } - out := new(ConnectionCondition) + out := new(ConnectionLatency) in.DeepCopyInto(out) return out } @@ -254,7 +254,6 @@ func (in *ConnectionList) DeepCopyObject() runtime.Object { func (in *ConnectionSpec) DeepCopyInto(out *ConnectionSpec) { *out = *in out.GatewayRef = in.GatewayRef - in.Ping.DeepCopyInto(&out.Ping) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionSpec. @@ -270,13 +269,7 @@ func (in *ConnectionSpec) DeepCopy() *ConnectionSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ConnectionStatus) DeepCopyInto(out *ConnectionStatus) { *out = *in - if in.Conditions != nil { - in, out := &in.Conditions, &out.Conditions - *out = make([]ConnectionCondition, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } + in.Latency.DeepCopyInto(&out.Latency) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionStatus. @@ -648,27 +641,6 @@ func (in *GatewayServerStatus) DeepCopy() *GatewayServerStatus { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *PingSpec) DeepCopyInto(out *PingSpec) { - *out = *in - if in.Enabled != nil { - in, out := &in.Enabled, &out.Enabled - *out = new(bool) - **out = **in - } - in.Endpoint.DeepCopyInto(&out.Endpoint) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PingSpec. -func (in *PingSpec) DeepCopy() *PingSpec { - if in == nil { - return nil - } - out := new(PingSpec) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PublicKey) DeepCopyInto(out *PublicKey) { *out = *in diff --git a/cmd/gateway/gateway/main.go b/cmd/gateway/gateway/main.go new file mode 100644 index 0000000000..ac9b922899 --- /dev/null +++ b/cmd/gateway/gateway/main.go @@ -0,0 +1,127 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package wireguard contains the logic to configure the Wireguard interface. +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/log" + + networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" + "github.com/liqotech/liqo/pkg/gateway/connection" + flagsutils "github.com/liqotech/liqo/pkg/utils/flags" + "github.com/liqotech/liqo/pkg/utils/mapper" + "github.com/liqotech/liqo/pkg/utils/restcfg" +) + +var ( + addToSchemeFunctions = []func(*runtime.Scheme) error{ + networkingv1alpha1.AddToScheme, + } + options = connection.NewOptions() +) + +func main() { + var cmd = cobra.Command{ + Use: "liqo-gateway", + RunE: run, + } + + legacyflags := flag.NewFlagSet("legacy", flag.ExitOnError) + restcfg.InitFlags(legacyflags) + klog.InitFlags(legacyflags) + flagsutils.FromFlagToPflag(legacyflags, cmd.Flags()) + + connection.InitFlags(cmd.Flags(), options) + if err := connection.MarkFlagsRequired(&cmd); err != nil { + klog.Error(err) + os.Exit(1) + } + + if err := cmd.Execute(); err != nil { + klog.Error(err) + os.Exit(1) + } +} + +func run(_ *cobra.Command, _ []string) error { + var err error + ctx := ctrl.SetupSignalHandler() + scheme := runtime.NewScheme() + + // Adds the APIs to the scheme. + for _, addToScheme := range addToSchemeFunctions { + if err = addToScheme(scheme); err != nil { + return fmt.Errorf("unable to add scheme: %w", err) + } + } + + // Set controller-runtime logger. + log.SetLogger(klog.NewKlogr()) + + // Get the rest config. + cfg := config.GetConfigOrDie() + + // Create the manager. + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ + MapperProvider: mapper.LiqoMapperProvider(scheme), + Scheme: scheme, + Namespace: options.Namespace, + MetricsBindAddress: "0", // Metrics are exposed by "connection" container. + HealthProbeBindAddress: options.ProbeAddr, + LeaderElection: options.LeaderElection, + LeaderElectionID: fmt.Sprintf( + "%s.%s.%s.connections.liqo.io", + options.Name, options.Namespace, options.Mode, + ), + LeaderElectionNamespace: options.Namespace, + LeaderElectionReleaseOnCancel: true, + LeaderElectionResourceLock: resourcelock.LeasesResourceLock, + LeaseDuration: &options.LeaderElectionLeaseDuration, + RenewDeadline: &options.LeaderElectionRenewDeadline, + RetryPeriod: &options.LeaderElectionRetryPeriod, + }) + if err != nil { + return fmt.Errorf("unable to create manager: %w", err) + } + + // Setup the controller. + connr, err := connection.NewConnectionsReconciler( + mgr.GetClient(), + mgr.GetScheme(), + mgr.GetEventRecorderFor("connections-controller"), + options, + ) + if err != nil { + return fmt.Errorf("unable to create connectioons reconciler: %w", err) + } + + // Setup the controller. + if err = connr.SetupWithManager(mgr); err != nil { + return fmt.Errorf("unable to setup connections reconciler: %w", err) + } + + // Start the manager. + return mgr.Start(ctx) +} diff --git a/cmd/liqonet/gateway-operator.go b/cmd/liqonet/gateway-operator.go index 468574e893..b416fde7b7 100644 --- a/cmd/liqonet/gateway-operator.go +++ b/cmd/liqonet/gateway-operator.go @@ -34,7 +34,7 @@ import ( tunneloperator "github.com/liqotech/liqo/internal/liqonet/tunnel-operator" liqoconst "github.com/liqotech/liqo/pkg/consts" - "github.com/liqotech/liqo/pkg/liqonet/conncheck" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" liqonetns "github.com/liqotech/liqo/pkg/liqonet/netns" liqonetutils "github.com/liqotech/liqo/pkg/liqonet/utils" "github.com/liqotech/liqo/pkg/liqonet/utils/links" diff --git a/deployments/liqo/charts/liqo-crds/crds/networking.liqo.io_connections.yaml b/deployments/liqo/charts/liqo-crds/crds/networking.liqo.io_connections.yaml index 48c7509a63..56185f88bf 100644 --- a/deployments/liqo/charts/liqo-crds/crds/networking.liqo.io_connections.yaml +++ b/deployments/liqo/charts/liqo-crds/crds/networking.liqo.io_connections.yaml @@ -16,7 +16,21 @@ spec: singular: connection scope: Namespaced versions: - - name: v1alpha1 + - additionalPrinterColumns: + - jsonPath: .spec.type + name: Type + type: string + - jsonPath: .status.latency.value + name: Latency + priority: 1 + type: string + - jsonPath: .status.value + name: Status + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha1 schema: openAPIV3Schema: description: Connection contains the status of a connection between two clusters @@ -74,35 +88,6 @@ spec: type: string type: object x-kubernetes-map-type: atomic - ping: - description: Ping specifies the ping configuration. - properties: - enabled: - default: true - description: Enabled specifies whether the ping is enabled or - not. - type: boolean - endpoint: - description: Endpoint specifies the endpoint to ping. - properties: - addresses: - description: Addresses specifies the addresses of the endpoint. - items: - type: string - type: array - port: - description: Port specifies the port of the endpoint. - format: int32 - type: integer - protocol: - default: TCP - description: Protocol specifies the protocol of the endpoint. - enum: - - TCP - - UDP - type: string - type: object - type: object type: description: Type of the connection. enum: @@ -116,43 +101,20 @@ spec: status: description: ConnectionStatus defines the observed state of Connection. properties: - conditions: - description: Conditions contains the conditions of the connection. - items: - description: ConnectionCondition contains details about state of - the connection. - properties: - lastTransitionTime: - description: LastTransitionTime -> timestamp for when the condition - last transitioned from one status to another. - format: date-time - type: string - message: - description: Message -> Human-readable message indicating details - about the last status transition. - type: string - reason: - description: Reason -> Machine-readable, UpperCamelCase text - indicating the reason for the condition's last transition. - type: string - status: - default: Unknown - description: Status of the condition. - enum: - - "True" - - "False" - - Unknown - type: string - type: - description: Type of the connection condition. - enum: - - Established - type: string - required: - - status - - type - type: object - type: array + latency: + description: Latency of the connection. + properties: + timestamp: + description: Timestamp of the latency. + format: date-time + type: string + value: + description: Value of the latency. + type: string + type: object + value: + description: Value of the connection. + type: string type: object type: object served: true diff --git a/deployments/liqo/files/liqo-newgateway-ClusterRole.yaml b/deployments/liqo/files/liqo-newgateway-ClusterRole.yaml index 61aa35e3fd..2bd55294fe 100644 --- a/deployments/liqo/files/liqo-newgateway-ClusterRole.yaml +++ b/deployments/liqo/files/liqo-newgateway-ClusterRole.yaml @@ -9,6 +9,16 @@ rules: - get - list - update +- apiGroups: + - networking.liqo.io + resources: + - connections + verbs: + - create + - delete + - get + - list + - update - apiGroups: - networking.liqo.io resources: diff --git a/internal/liqonet/tunnel-operator/tunnel-operator.go b/internal/liqonet/tunnel-operator/tunnel-operator.go index c8347a7724..ba97faec12 100644 --- a/internal/liqonet/tunnel-operator/tunnel-operator.go +++ b/internal/liqonet/tunnel-operator/tunnel-operator.go @@ -41,7 +41,7 @@ import ( netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" liqoconst "github.com/liqotech/liqo/pkg/consts" - "github.com/liqotech/liqo/pkg/liqonet/conncheck" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" "github.com/liqotech/liqo/pkg/liqonet/iptables" liqonetns "github.com/liqotech/liqo/pkg/liqonet/netns" liqorouting "github.com/liqotech/liqo/pkg/liqonet/routing" diff --git a/pkg/liqonet/conncheck/common.go b/pkg/gateway/connection/conncheck/common.go similarity index 100% rename from pkg/liqonet/conncheck/common.go rename to pkg/gateway/connection/conncheck/common.go diff --git a/pkg/liqonet/conncheck/conncheck.go b/pkg/gateway/connection/conncheck/conncheck.go similarity index 96% rename from pkg/liqonet/conncheck/conncheck.go rename to pkg/gateway/connection/conncheck/conncheck.go index 24cc017499..f2b1832cd2 100644 --- a/pkg/liqonet/conncheck/conncheck.go +++ b/pkg/gateway/connection/conncheck/conncheck.go @@ -73,7 +73,7 @@ func (c *ConnChecker) AddAndRunSender(clusterID, ip string, updateCallback Updat } ctxSender, cancelSender := context.WithCancel(context.Background()) - c.senders[clusterID] = NewSender(ctxSender, clusterID, cancelSender, c.conn, ip) + c.senders[clusterID] = NewSender(clusterID, cancelSender, c.conn, ip) err := c.receiver.InitPeer(clusterID, updateCallback) if err != nil { @@ -83,7 +83,7 @@ func (c *ConnChecker) AddAndRunSender(clusterID, ip string, updateCallback Updat klog.Infof("conncheck sender %s starting", clusterID) pingCallback := func(ctx context.Context) (done bool, err error) { - err = c.senders[clusterID].SendPing(ctx) + err = c.senders[clusterID].SendPing() if err != nil { klog.Warningf("failed to send ping: %s", err) } diff --git a/pkg/liqonet/conncheck/consts.go b/pkg/gateway/connection/conncheck/consts.go similarity index 100% rename from pkg/liqonet/conncheck/consts.go rename to pkg/gateway/connection/conncheck/consts.go diff --git a/pkg/liqonet/conncheck/doc.go b/pkg/gateway/connection/conncheck/doc.go similarity index 100% rename from pkg/liqonet/conncheck/doc.go rename to pkg/gateway/connection/conncheck/doc.go diff --git a/pkg/liqonet/conncheck/receiver.go b/pkg/gateway/connection/conncheck/receiver.go similarity index 97% rename from pkg/liqonet/conncheck/receiver.go rename to pkg/gateway/connection/conncheck/receiver.go index f955154ffa..8d3de3a5bc 100644 --- a/pkg/liqonet/conncheck/receiver.go +++ b/pkg/gateway/connection/conncheck/receiver.go @@ -105,7 +105,7 @@ func (r *Receiver) InitPeer(clusterID string, updateCallback UpdateFunc) error { // Run starts the receiver. func (r *Receiver) Run() { - klog.V(8).Infof("conncheck receiver: starting") + klog.Infof("conncheck receiver: started") for { n, raddr, err := r.conn.ReadFromUDP(r.buff) if err != nil { @@ -135,7 +135,7 @@ func (r *Receiver) Run() { // RunDisconnectObserver starts the disconnect observer. func (r *Receiver) RunDisconnectObserver() { - klog.V(9).Infof("conncheck receiver disconnect checker: starting") + klog.Infof("conncheck receiver disconnect checker: started") // Ignore errors because only caused by context cancellation. _ = wait.PollImmediateInfiniteWithContext(context.Background(), time.Duration(PingLossThreshold)*PingInterval/10, func(ctx context.Context) (done bool, err error) { diff --git a/pkg/liqonet/conncheck/sender.go b/pkg/gateway/connection/conncheck/sender.go similarity index 89% rename from pkg/liqonet/conncheck/sender.go rename to pkg/gateway/connection/conncheck/sender.go index dd82198b32..9c35a37bdb 100644 --- a/pkg/liqonet/conncheck/sender.go +++ b/pkg/gateway/connection/conncheck/sender.go @@ -15,7 +15,6 @@ package conncheck import ( - "context" "encoding/json" "fmt" "net" @@ -33,7 +32,7 @@ type Sender struct { } // NewSender creates a new conncheck sender. -func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.UDPConn, ip string) *Sender { +func NewSender(clusterID string, cancel func(), conn *net.UDPConn, ip string) *Sender { return &Sender{ clusterID: clusterID, cancel: cancel, @@ -43,7 +42,7 @@ func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.U } // SendPing sends a PING message to the given address. -func (s *Sender) SendPing(ctx context.Context) error { +func (s *Sender) SendPing() error { msgOut := Msg{ClusterID: s.clusterID, MsgType: PING, TimeStamp: time.Now()} b, err := json.Marshal(msgOut) if err != nil { diff --git a/pkg/gateway/connection/connections-controller.go b/pkg/gateway/connection/connections-controller.go new file mode 100644 index 0000000000..d8da5bfca3 --- /dev/null +++ b/pkg/gateway/connection/connections-controller.go @@ -0,0 +1,104 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "context" + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" + "github.com/liqotech/liqo/pkg/gateway/tunnel/common" +) + +// cluster-role +// +kubebuilder:rbac:groups=networking.liqo.io,resources=connections,verbs=get;list;create;delete;update + +// ConnectionsReconciler updates the PublicKey resource used to establish the Wireguard connection. +type ConnectionsReconciler struct { + ConnChecker *conncheck.ConnChecker + Client client.Client + Scheme *runtime.Scheme + EventsRecorder record.EventRecorder + Options *Options +} + +// NewConnectionsReconciler returns a new PublicKeysReconciler. +func NewConnectionsReconciler(cl client.Client, s *runtime.Scheme, er record.EventRecorder, options *Options) (*ConnectionsReconciler, error) { + connchecker, err := conncheck.NewConnChecker() + if err != nil { + return nil, fmt.Errorf("unable to create the connection checker: %w", err) + } + go connchecker.RunReceiver() + go connchecker.RunReceiverDisconnectObserver() + return &ConnectionsReconciler{ + ConnChecker: connchecker, + Client: cl, + Scheme: s, + EventsRecorder: er, + Options: options, + }, nil +} + +// Reconcile manage PublicKey resources. +func (r *ConnectionsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + connection := &networkingv1alpha1.Connection{} + if err := r.Client.Get(ctx, req.NamespacedName, connection); err != nil { + if apierrors.IsNotFound(err) { + klog.Infof("There is no connection %s", req.String()) + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("unable to get the connection %q: %w", req.NamespacedName, err) + } + + r.ConnChecker.AddAndRunSender(r.Options.RemoteClusterID, common.GetInterfaceIP(r.Options.Mode), ForgeUpdateConnectionCallback(ctx, r.Client, req)) + return ctrl.Result{}, nil +} + +// SetupWithManager register the ConfigurationReconciler to the manager. +func (r *ConnectionsReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&networkingv1alpha1.Connection{}, r.Predicates()). + Complete(r) +} + +// Predicates returns the predicates required for the PublicKey controller. +func (r *ConnectionsReconciler) Predicates() builder.Predicates { + return builder.WithPredicates( + predicate.NewPredicateFuncs(func(object client.Object) bool { + return true + })) +} + +// ForgeUpdateConnectionCallback forges the UpdateConnectionStatus function. +func ForgeUpdateConnectionCallback(ctx context.Context, cl client.Client, req ctrl.Request) conncheck.UpdateFunc { + return func(connected bool, latency time.Duration, timestamp time.Time) error { + connection := &networkingv1alpha1.Connection{} + if err := cl.Get(ctx, req.NamespacedName, connection); err != nil { + return err + } + return UpdateConnectionStatus(ctx, cl, connection, connected, latency, timestamp) + } +} diff --git a/pkg/gateway/connection/flags.go b/pkg/gateway/connection/flags.go new file mode 100644 index 0000000000..b8e3336c8c --- /dev/null +++ b/pkg/gateway/connection/flags.go @@ -0,0 +1,109 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "time" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" +) + +// FlagName is the type for the name of the flags. +type FlagName string + +func (fn FlagName) String() string { + return string(fn) +} + +const ( + // FlagNameName is the name of the wireguard interface. + FlagNameName FlagName = "name" + // FlagNameNamespace is the namespace of the wireguard interface. + FlagNameNamespace FlagName = "namespace" + // FlagNameRemoteClusterID is the clusterID of the remote cluster. + FlagNameRemoteClusterID FlagName = "remote-cluster-id" + + // FlagNameMode is the mode in which the wireguard interface is configured. + FlagNameMode FlagName = "mode" + + // FlagNamePingLatencyUpdateInterval is the interval at which the gateway operator updates the latency value in the status of the tunnel-endpoint. + FlagNamePingLatencyUpdateInterval FlagName = "ping-latency-update-interval" + // FlagNamePingLossThreshold is the number of lost packets after which the connection check is considered as failed. + FlagNamePingLossThreshold FlagName = "ping-loss-threshold" + // FlagNamePingInterval is the interval at which the ping is sent. + FlagNamePingInterval FlagName = "ping-interval" + + // FlagNameLeaderElection is the flag to enable leader election. + FlagNameLeaderElection FlagName = "leader-election" + // FlagNameLeaderElectionLeaseDuration is the lease duration for the leader election. + FlagNameLeaderElectionLeaseDuration FlagName = "leader-election-lease-duration" + // FlagNameLeaderElectionRenewDeadline is the renew deadline for the leader election. + FlagNameLeaderElectionRenewDeadline FlagName = "leader-election-renew-deadline" + // FlagNameLeaderElectionRetryPeriod is the retry period for the leader election. + FlagNameLeaderElectionRetryPeriod FlagName = "leader-election-retry-period" + + // FlagNameMetricsAddress is the address for the metrics endpoint. + FlagNameMetricsAddress FlagName = "metrics-address" + // FlagNameProbeAddr is the address for the health probe endpoint. + FlagNameProbeAddr FlagName = "health-probe-bind-address" +) + +// RequiredFlags contains the list of the mandatory flags. +var RequiredFlags = []FlagName{ + FlagNameName, + FlagNameNamespace, + FlagNameRemoteClusterID, + FlagNameMode, +} + +// InitFlags initializes the flags for the wireguard tunnel. +func InitFlags(flagset *pflag.FlagSet, opts *Options) { + flagset.StringVar(&opts.Name, FlagNameName.String(), "", "Name for the wireguard interface") + flagset.StringVar(&opts.Namespace, FlagNameNamespace.String(), "", "Namespace for the wireguard interface") + flagset.StringVar(&opts.RemoteClusterID, FlagNameRemoteClusterID.String(), "", "ClusterID for the wireguard interface") + + flagset.Var(&opts.Mode, FlagNameMode.String(), "Mode for the wireguard interface") + + flagset.DurationVar(&opts.PingLatencyUpdateInterval, FlagNamePingLatencyUpdateInterval.String(), 30*time.Second, + "Interval at which the gateway operator updates the latency value in the status of the tunnel-endpoint") + flagset.UintVar(&conncheck.PingLossThreshold, FlagNamePingLossThreshold.String(), 5, + "Number of lost packets after which the connection check is considered as failed.") + flagset.DurationVar(&conncheck.PingInterval, FlagNamePingInterval.String(), 2*time.Second, + "Interval at which the ping is sent.") + + flagset.BoolVar(&opts.LeaderElection, FlagNameLeaderElection.String(), false, "Enable leader election") + flagset.DurationVar(&opts.LeaderElectionLeaseDuration, FlagNameLeaderElectionLeaseDuration.String(), 15*time.Second, + "LeaseDuration for the leader election") + flagset.DurationVar(&opts.LeaderElectionRenewDeadline, FlagNameLeaderElectionRenewDeadline.String(), 10*time.Second, + "RenewDeadline for the leader election") + flagset.DurationVar(&opts.LeaderElectionRetryPeriod, FlagNameLeaderElectionRetryPeriod.String(), 2*time.Second, + "RetryPeriod for the leader election") + + flagset.StringVar(&opts.MetricsAddress, FlagNameMetricsAddress.String(), ":8080", "Address for the metrics endpoint") + flagset.StringVar(&opts.ProbeAddr, FlagNameProbeAddr.String(), ":8081", "Address for the health probe endpoint") +} + +// MarkFlagsRequired marks the flags as required. +func MarkFlagsRequired(cmd *cobra.Command) error { + for _, flag := range RequiredFlags { + if err := cmd.MarkFlagRequired(flag.String()); err != nil { + return err + } + } + return nil +} diff --git a/pkg/gateway/connection/k8s.go b/pkg/gateway/connection/k8s.go index e967562067..c544c374ad 100644 --- a/pkg/gateway/connection/k8s.go +++ b/pkg/gateway/connection/k8s.go @@ -16,40 +16,25 @@ package connection import ( "context" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" + liqonetutils "github.com/liqotech/liqo/pkg/liqonet/utils" ) -// forgeConnectionStatus forges the status of the connection. -func forgeConnectionStatus(conn *networkingv1alpha1.Connection, - condType *networkingv1alpha1.ConnectionConditionType, - condStatus *networkingv1alpha1.ConnectionConditionStatusType) (update bool) { - update = false - for i := range conn.Status.Conditions { - if conn.Status.Conditions[i].Type == *condType { - if conn.Status.Conditions[i].Status != *condStatus { - update = true - conn.Status.Conditions[i].Status = *condStatus - conn.Status.Conditions[i].Message = "" - conn.Status.Conditions[i].Reason = "" - conn.Status.Conditions[i].LastTransitionTime = metav1.Now() - } - return update - } - } - return update -} - // UpdateConnectionStatus updates the status of the connection. func UpdateConnectionStatus(ctx context.Context, cl client.Client, conn *networkingv1alpha1.Connection, - condType *networkingv1alpha1.ConnectionConditionType, - condStatus *networkingv1alpha1.ConnectionConditionStatusType) error { - update := forgeConnectionStatus(conn, condType, condStatus) - if !update { - return nil + connected bool, latencyValue time.Duration, latencyTimestamp time.Time) error { + switch connected { + case true: + conn.Status.Value = networkingv1alpha1.Connected + case false: + conn.Status.Value = networkingv1alpha1.Connecting } + conn.Status.Latency.Value = liqonetutils.FormatLatency(latencyValue) + conn.Status.Latency.Timestamp = metav1.Time{Time: latencyTimestamp} return cl.Status().Update(ctx, conn) } diff --git a/pkg/gateway/connection/options.go b/pkg/gateway/connection/options.go new file mode 100644 index 0000000000..3dd6a73182 --- /dev/null +++ b/pkg/gateway/connection/options.go @@ -0,0 +1,47 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "time" + + "github.com/liqotech/liqo/pkg/gateway/tunnel/common" +) + +// Options contains the options for the wireguard interface. +type Options struct { + Name string + Namespace string + RemoteClusterID string + + Mode common.Mode + + LeaderElection bool + LeaderElectionLeaseDuration time.Duration + LeaderElectionRenewDeadline time.Duration + LeaderElectionRetryPeriod time.Duration + + PingLatencyUpdateInterval time.Duration + PingLossThreshold uint + PingInterval time.Duration + + MetricsAddress string + ProbeAddr string +} + +// NewOptions returns a new Options struct. +func NewOptions() *Options { + return &Options{} +} diff --git a/pkg/gateway/tunnel/common/netlink.go b/pkg/gateway/tunnel/common/netlink.go index f784928059..6deefbb9fa 100644 --- a/pkg/gateway/tunnel/common/netlink.go +++ b/pkg/gateway/tunnel/common/netlink.go @@ -16,6 +16,13 @@ package common import "github.com/vishvananda/netlink" +const ( + // ServerInterfaceIP is the IP address of the Wireguard interface in server mode. + ServerInterfaceIP = "169.254.0.1/30" + // ClientInterfaceIP is the IP address of the Wireguard interface in client mode. + ClientInterfaceIP = "169.254.0.2/30" +) + // AddAddress adds an IP address to the Wireguard interface. func AddAddress(link netlink.Link, ip string) error { addr, err := netlink.ParseAddr(ip) @@ -30,3 +37,14 @@ func AddAddress(link netlink.Link, ip string) error { func GetLink(name string) (netlink.Link, error) { return netlink.LinkByName(name) } + +// GetInterfaceIP returns the IP address of the Wireguard interface. +func GetInterfaceIP(mode Mode) string { + switch mode { + case ModeServer: + return ServerInterfaceIP + case ModeClient: + return ClientInterfaceIP + } + return "" +} diff --git a/pkg/gateway/tunnel/wireguard/k8s.go b/pkg/gateway/tunnel/wireguard/k8s.go index b4fcd8a1d7..0479882140 100644 --- a/pkg/gateway/tunnel/wireguard/k8s.go +++ b/pkg/gateway/tunnel/wireguard/k8s.go @@ -118,8 +118,6 @@ func (r *PublicKeysReconciler) EnsureConnection(ctx context.Context) error { conn.Spec.Type = networkingv1alpha1.ConnectionTypeClient conn.Spec.GatewayRef.Kind = networkingv1alpha1.WgGatewayClientKind } - // We need to create an IP resource, maybe it' better to do it in the connection controller. - conn.Spec.Ping.Endpoint = networkingv1alpha1.EndpointStatus{} return nil }) diff --git a/pkg/gateway/tunnel/wireguard/netlink.go b/pkg/gateway/tunnel/wireguard/netlink.go index b4070987f4..5321b7a67a 100644 --- a/pkg/gateway/tunnel/wireguard/netlink.go +++ b/pkg/gateway/tunnel/wireguard/netlink.go @@ -23,13 +23,6 @@ import ( "github.com/liqotech/liqo/pkg/gateway/tunnel/common" ) -const ( - // ServerInterfaceIP is the IP address of the Wireguard interface in server mode. - ServerInterfaceIP = "169.254.0.1/30" - // ClientInterfaceIP is the IP address of the Wireguard interface in client mode. - ClientInterfaceIP = "169.254.0.2/30" -) - // InitWireguardLink inits the Wireguard interface. func InitWireguardLink(options *Options) error { if err := createLink(options); err != nil { @@ -41,25 +34,14 @@ func InitWireguardLink(options *Options) error { return err } - klog.Infof("Setting up Wireguard interface %q with IP %q", options.InterfaceName, GetInterfaceIP(options.Mode)) - if err := common.AddAddress(link, GetInterfaceIP(options.Mode)); err != nil { + klog.Infof("Setting up Wireguard interface %q with IP %q", options.InterfaceName, common.GetInterfaceIP(options.Mode)) + if err := common.AddAddress(link, common.GetInterfaceIP(options.Mode)); err != nil { return err } return netlink.LinkSetUp(link) } -// GetInterfaceIP returns the IP address of the Wireguard interface. -func GetInterfaceIP(mode common.Mode) string { - switch mode { - case common.ModeServer: - return ServerInterfaceIP - case common.ModeClient: - return ClientInterfaceIP - } - return "" -} - // CreateLink creates a new Wireguard interface. func createLink(options *Options) error { link := netlink.Wireguard{ diff --git a/pkg/liqonet/tunnel/driver.go b/pkg/liqonet/tunnel/driver.go index 3bce59c83d..d6389c8f23 100644 --- a/pkg/liqonet/tunnel/driver.go +++ b/pkg/liqonet/tunnel/driver.go @@ -21,7 +21,7 @@ import ( "k8s.io/klog/v2" netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" - "github.com/liqotech/liqo/pkg/liqonet/conncheck" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" ) // DriverCreateFunc function prototype to create a new driver. diff --git a/pkg/liqonet/tunnel/wireguard/driver.go b/pkg/liqonet/tunnel/wireguard/driver.go index 16e872e9de..0f374d415d 100644 --- a/pkg/liqonet/tunnel/wireguard/driver.go +++ b/pkg/liqonet/tunnel/wireguard/driver.go @@ -42,7 +42,7 @@ import ( discv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" liqoconst "github.com/liqotech/liqo/pkg/consts" - "github.com/liqotech/liqo/pkg/liqonet/conncheck" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" "github.com/liqotech/liqo/pkg/liqonet/tunnel" "github.com/liqotech/liqo/pkg/liqonet/tunnel/metrics" "github.com/liqotech/liqo/pkg/liqonet/tunnel/resolver"