From 624997cd14272bb6c39352ab16cd0daea5faa8fb Mon Sep 17 00:00:00 2001 From: Francesco Cheinasso Date: Tue, 10 Oct 2023 14:02:16 +0200 Subject: [PATCH] External Network: connection controller --- .github/dependabot.yml | 7 +- .github/workflows/integration.yml | 2 +- build/gateway/Dockerfile | 20 +++ cmd/gateway/main.go | 135 +++++++++++++++++ cmd/gateway/wireguard/main.go | 25 ++-- cmd/liqonet/gateway-operator.go | 2 +- .../liqo/files/liqo-gateway-ClusterRole.yaml | 22 ++- .../tunnel-operator/tunnel-operator.go | 2 +- .../connection}/conncheck/common.go | 0 .../connection}/conncheck/conncheck.go | 78 +++++++--- .../connection}/conncheck/consts.go | 2 + .../connection}/conncheck/doc.go | 0 pkg/gateway/connection/conncheck/error.go | 33 +++++ .../connection}/conncheck/receiver.go | 0 .../connection}/conncheck/sender.go | 5 +- .../connection/connections_controller.go | 138 ++++++++++++++++++ pkg/gateway/connection/doc.go | 16 ++ pkg/gateway/connection/flags.go | 66 +++++++++ pkg/gateway/connection/k8s.go | 51 +++++++ pkg/gateway/doc.go | 16 ++ pkg/gateway/flags.go | 93 ++++++++++++ pkg/gateway/{tunnel/common => }/options.go | 31 +++- pkg/gateway/tunnel/common/netlink.go | 39 ++++- pkg/gateway/tunnel/wireguard/device.go | 8 +- pkg/gateway/tunnel/wireguard/dns.go | 10 +- pkg/gateway/tunnel/wireguard/flags.go | 45 +----- pkg/gateway/tunnel/wireguard/k8s.go | 47 +++--- pkg/gateway/tunnel/wireguard/labels.go | 6 +- pkg/gateway/tunnel/wireguard/netlink.go | 25 +--- pkg/gateway/tunnel/wireguard/options.go | 27 +--- ...controller.go => publickeys_controller.go} | 6 +- ...troller.go => configuration_controller.go} | 0 .../external-network/wireguard/utils.go | 4 +- pkg/liqonet/tunnel/driver.go | 2 +- pkg/liqonet/tunnel/wireguard/driver.go | 8 +- pkg/utils/time/doc.go | 16 ++ pkg/utils/time/time.go | 33 +++++ 37 files changed, 853 insertions(+), 167 deletions(-) create mode 100644 build/gateway/Dockerfile create mode 100644 cmd/gateway/main.go rename pkg/{liqonet => gateway/connection}/conncheck/common.go (100%) rename pkg/{liqonet => gateway/connection}/conncheck/conncheck.go (66%) rename pkg/{liqonet => gateway/connection}/conncheck/consts.go (88%) rename pkg/{liqonet => gateway/connection}/conncheck/doc.go (100%) create mode 100644 pkg/gateway/connection/conncheck/error.go rename pkg/{liqonet => gateway/connection}/conncheck/receiver.go (100%) rename pkg/{liqonet => gateway/connection}/conncheck/sender.go (90%) create mode 100644 pkg/gateway/connection/connections_controller.go create mode 100644 pkg/gateway/connection/doc.go create mode 100644 pkg/gateway/connection/flags.go create mode 100644 pkg/gateway/connection/k8s.go create mode 100644 pkg/gateway/doc.go create mode 100644 pkg/gateway/flags.go rename pkg/gateway/{tunnel/common => }/options.go (66%) rename pkg/gateway/tunnel/wireguard/{publickeys-controller.go => publickeys_controller.go} (96%) rename pkg/liqo-controller-manager/external-network/configuration-controller/{configuration-controller.go => configuration_controller.go} (100%) create mode 100644 pkg/utils/time/doc.go create mode 100644 pkg/utils/time/time.go diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 49d5ed5ddf..bacfc47af4 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -36,6 +36,11 @@ updates: interval: "daily" - package-ecosystem: "docker" - directory: "/build/gateway/tunnel/wireguard" + directory: "/build/gateway" + schedule: + interval: "daily" + + - package-ecosystem: "docker" + directory: "/build/gateway/wireguard" schedule: interval: "daily" diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index c777332dba..6fb21f653b 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -12,7 +12,6 @@ on: types: - test-command - build-command - jobs: configure: name: Preliminary configuration @@ -90,6 +89,7 @@ jobs: - metric-agent - telemetry - proxy + - gateway - gateway/wireguard steps: - name: Set up QEMU diff --git a/build/gateway/Dockerfile b/build/gateway/Dockerfile new file mode 100644 index 0000000000..37a475dd31 --- /dev/null +++ b/build/gateway/Dockerfile @@ -0,0 +1,20 @@ +FROM golang:1.21 as goBuilder +WORKDIR /tmp/builder + +COPY go.mod ./go.mod +COPY go.sum ./go.sum +RUN go mod download + +COPY . ./ +RUN CGO_ENABLED=0 GOOS=linux GOARCH=$(go env GOARCH) go build -ldflags="-s -w" ./cmd/gateway + + +FROM alpine:3.18 + +RUN apk update && \ + apk add iptables bash tcpdump conntrack-tools curl iputils && \ + rm -rf /var/cache/apk/* + +COPY --from=goBuilder /tmp/builder/gateway /usr/bin/liqo-gateway + +ENTRYPOINT [ "/usr/bin/liqo-gateway" ] diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go new file mode 100644 index 0000000000..8c24e38b27 --- /dev/null +++ b/cmd/gateway/main.go @@ -0,0 +1,135 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package wireguard contains the logic to configure the Wireguard interface. +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/log" + + networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" + "github.com/liqotech/liqo/pkg/gateway" + "github.com/liqotech/liqo/pkg/gateway/connection" + flagsutils "github.com/liqotech/liqo/pkg/utils/flags" + "github.com/liqotech/liqo/pkg/utils/mapper" + "github.com/liqotech/liqo/pkg/utils/restcfg" +) + +var ( + addToSchemeFunctions = []func(*runtime.Scheme) error{ + networkingv1alpha1.AddToScheme, + } + options = gateway.NewOptions() +) + +func main() { + var cmd = cobra.Command{ + Use: "liqo-gateway", + RunE: run, + } + + legacyflags := flag.NewFlagSet("legacy", flag.ExitOnError) + restcfg.InitFlags(legacyflags) + klog.InitFlags(legacyflags) + flagsutils.FromFlagToPflag(legacyflags, cmd.Flags()) + + gateway.InitFlags(cmd.Flags(), options) + if err := gateway.MarkFlagsRequired(&cmd); err != nil { + klog.Error(err) + os.Exit(1) + } + + connection.InitFlags(cmd.Flags()) + if err := connection.MarkFlagsRequired(&cmd); err != nil { + klog.Error(err) + os.Exit(1) + } + + if err := cmd.Execute(); err != nil { + klog.Error(err) + os.Exit(1) + } +} + +func run(_ *cobra.Command, _ []string) error { + var err error + ctx := ctrl.SetupSignalHandler() + scheme := runtime.NewScheme() + + // Adds the APIs to the scheme. + for _, addToScheme := range addToSchemeFunctions { + if err = addToScheme(scheme); err != nil { + return fmt.Errorf("unable to add scheme: %w", err) + } + } + + // Set controller-runtime logger. + log.SetLogger(klog.NewKlogr()) + + // Get the rest config. + cfg := config.GetConfigOrDie() + + // Create the manager. + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ + MapperProvider: mapper.LiqoMapperProvider(scheme), + Scheme: scheme, + Namespace: options.Namespace, + MetricsBindAddress: "0", // Metrics are exposed by "connection" container. + HealthProbeBindAddress: options.ProbeAddr, + LeaderElection: options.LeaderElection, + LeaderElectionID: fmt.Sprintf( + "%s.%s.%s.connections.liqo.io", + options.Name, options.Namespace, options.Mode, + ), + LeaderElectionNamespace: options.Namespace, + LeaderElectionReleaseOnCancel: true, + LeaderElectionResourceLock: resourcelock.LeasesResourceLock, + LeaseDuration: &options.LeaderElectionLeaseDuration, + RenewDeadline: &options.LeaderElectionRenewDeadline, + RetryPeriod: &options.LeaderElectionRetryPeriod, + }) + if err != nil { + return fmt.Errorf("unable to create manager: %w", err) + } + + // Setup the controller. + connr, err := connection.NewConnectionsReconciler( + ctx, + mgr.GetClient(), + mgr.GetScheme(), + mgr.GetEventRecorderFor("connections-controller"), + options, + ) + if err != nil { + return fmt.Errorf("unable to create connectioons reconciler: %w", err) + } + + // Setup the controller. + if err = connr.SetupWithManager(mgr); err != nil { + return fmt.Errorf("unable to setup connections reconciler: %w", err) + } + + // Start the manager. + return mgr.Start(ctx) +} diff --git a/cmd/gateway/wireguard/main.go b/cmd/gateway/wireguard/main.go index 190d60d6b6..5b93d30c70 100644 --- a/cmd/gateway/wireguard/main.go +++ b/cmd/gateway/wireguard/main.go @@ -34,7 +34,7 @@ import ( ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1" networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" - "github.com/liqotech/liqo/pkg/gateway/tunnel/common" + "github.com/liqotech/liqo/pkg/gateway" "github.com/liqotech/liqo/pkg/gateway/tunnel/wireguard" flagsutils "github.com/liqotech/liqo/pkg/utils/flags" "github.com/liqotech/liqo/pkg/utils/mapper" @@ -47,7 +47,7 @@ var ( networkingv1alpha1.AddToScheme, ipamv1alpha1.AddToScheme, } - options = wireguard.NewOptions() + options = wireguard.NewOptions(gateway.NewOptions()) ) func main() { @@ -61,6 +61,7 @@ func main() { klog.InitFlags(legacyflags) flagsutils.FromFlagToPflag(legacyflags, cmd.Flags()) + gateway.InitFlags(cmd.Flags(), options.GwOptions) wireguard.InitFlags(cmd.Flags(), options) if err := wireguard.MarkFlagsRequired(&cmd, options); err != nil { klog.Error(err) @@ -103,20 +104,20 @@ func run(cmd *cobra.Command, _ []string) error { mgr, err := ctrl.NewManager(cfg, ctrl.Options{ MapperProvider: mapper.LiqoMapperProvider(scheme), Scheme: scheme, - Namespace: options.Namespace, - MetricsBindAddress: options.MetricsAddress, - HealthProbeBindAddress: options.ProbeAddr, - LeaderElection: options.LeaderElection, + Namespace: options.GwOptions.Namespace, + MetricsBindAddress: options.GwOptions.MetricsAddress, + HealthProbeBindAddress: options.GwOptions.ProbeAddr, + LeaderElection: options.GwOptions.LeaderElection, LeaderElectionID: fmt.Sprintf( "%s.%s.%s.wgtunnel.liqo.io", - wireguard.GenerateResourceName(options.Name), options.Namespace, options.Mode, + gateway.GenerateResourceName(options.GwOptions.Name), options.GwOptions.Namespace, options.GwOptions.Mode, ), - LeaderElectionNamespace: options.Namespace, + LeaderElectionNamespace: options.GwOptions.Namespace, LeaderElectionReleaseOnCancel: true, LeaderElectionResourceLock: resourcelock.LeasesResourceLock, - LeaseDuration: &options.LeaderElectionLeaseDuration, - RenewDeadline: &options.LeaderElectionRenewDeadline, - RetryPeriod: &options.LeaderElectionRetryPeriod, + LeaseDuration: &options.GwOptions.LeaderElectionLeaseDuration, + RenewDeadline: &options.GwOptions.LeaderElectionRenewDeadline, + RetryPeriod: &options.GwOptions.LeaderElectionRetryPeriod, }) if err != nil { return fmt.Errorf("unable to create manager: %w", err) @@ -134,7 +135,7 @@ func run(cmd *cobra.Command, _ []string) error { } dnsChan := make(chan event.GenericEvent) - if options.Mode == common.ModeClient { + if options.GwOptions.Mode == gateway.ModeClient { if wireguard.IsDNSRoutineRequired(options) { go wireguard.StartDNSRoutine(cmd.Context(), dnsChan, options) klog.Infof("Starting DNS routine: resolving the endpoint address every %s", options.DNSCheckInterval.String()) diff --git a/cmd/liqonet/gateway-operator.go b/cmd/liqonet/gateway-operator.go index 468574e893..b416fde7b7 100644 --- a/cmd/liqonet/gateway-operator.go +++ b/cmd/liqonet/gateway-operator.go @@ -34,7 +34,7 @@ import ( tunneloperator "github.com/liqotech/liqo/internal/liqonet/tunnel-operator" liqoconst "github.com/liqotech/liqo/pkg/consts" - "github.com/liqotech/liqo/pkg/liqonet/conncheck" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" liqonetns "github.com/liqotech/liqo/pkg/liqonet/netns" liqonetutils "github.com/liqotech/liqo/pkg/liqonet/utils" "github.com/liqotech/liqo/pkg/liqonet/utils/links" diff --git a/deployments/liqo/files/liqo-gateway-ClusterRole.yaml b/deployments/liqo/files/liqo-gateway-ClusterRole.yaml index 61aa35e3fd..ee4c6580a6 100644 --- a/deployments/liqo/files/liqo-gateway-ClusterRole.yaml +++ b/deployments/liqo/files/liqo-gateway-ClusterRole.yaml @@ -12,10 +12,30 @@ rules: - apiGroups: - networking.liqo.io resources: - - publickeys + - connections verbs: - create - delete - get - list - update + - watch +- apiGroups: + - networking.liqo.io + resources: + - connections/status + verbs: + - get + - patch + - update +- apiGroups: + - networking.liqo.io + resources: + - publickeies + verbs: + - create + - delete + - get + - list + - update + - watch diff --git a/internal/liqonet/tunnel-operator/tunnel-operator.go b/internal/liqonet/tunnel-operator/tunnel-operator.go index 9c7e34e208..76db5c4580 100644 --- a/internal/liqonet/tunnel-operator/tunnel-operator.go +++ b/internal/liqonet/tunnel-operator/tunnel-operator.go @@ -41,7 +41,7 @@ import ( netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" liqoconst "github.com/liqotech/liqo/pkg/consts" - "github.com/liqotech/liqo/pkg/liqonet/conncheck" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" "github.com/liqotech/liqo/pkg/liqonet/iptables" liqonetns "github.com/liqotech/liqo/pkg/liqonet/netns" liqorouting "github.com/liqotech/liqo/pkg/liqonet/routing" diff --git a/pkg/liqonet/conncheck/common.go b/pkg/gateway/connection/conncheck/common.go similarity index 100% rename from pkg/liqonet/conncheck/common.go rename to pkg/gateway/connection/conncheck/common.go diff --git a/pkg/liqonet/conncheck/conncheck.go b/pkg/gateway/connection/conncheck/conncheck.go similarity index 66% rename from pkg/liqonet/conncheck/conncheck.go rename to pkg/gateway/connection/conncheck/conncheck.go index 543392c76e..84c2c3967f 100644 --- a/pkg/liqonet/conncheck/conncheck.go +++ b/pkg/gateway/connection/conncheck/conncheck.go @@ -29,9 +29,10 @@ import ( type ConnChecker struct { receiver *Receiver // key is the target cluster ID. - senders map[string]*Sender - sm sync.RWMutex - conn *net.UDPConn + senders map[string]*Sender + runningSenders map[string]*Sender + sm sync.RWMutex + conn *net.UDPConn } // NewConnChecker creates a new ConnChecker. @@ -46,9 +47,10 @@ func NewConnChecker() (*ConnChecker, error) { } klog.V(4).Infof("conncheck socket: listening on %s", addr) connChecker := ConnChecker{ - receiver: NewReceiver(conn), - senders: make(map[string]*Sender), - conn: conn, + receiver: NewReceiver(conn), + senders: make(map[string]*Sender), + runningSenders: make(map[string]*Sender), + conn: conn, } return &connChecker, nil } @@ -63,42 +65,53 @@ func (c *ConnChecker) RunReceiverDisconnectObserver(ctx context.Context) { c.receiver.RunDisconnectObserver(ctx) } -// AddAndRunSender create a new sender and runs it. -func (c *ConnChecker) AddAndRunSender(ctx context.Context, clusterID, ip string, updateCallback UpdateFunc) { +// AddSender adds a sender. +func (c *ConnChecker) AddSender(ctx context.Context, clusterID, ip string, updateCallback UpdateFunc) error { var err error + + if clusterID == "" { + return fmt.Errorf("clusterID cannot be empty") + } + c.sm.Lock() + defer c.sm.Unlock() + if _, ok := c.senders[clusterID]; ok { - c.sm.Unlock() - klog.Infof("sender %s already exists", clusterID) - return + return NewDuplicateError(clusterID) } ctxSender, cancelSender := context.WithCancel(ctx) - c.senders[clusterID], err = NewSender(clusterID, cancelSender, c.conn, ip) - + c.senders[clusterID], err = NewSender(ctxSender, clusterID, cancelSender, c.conn, ip) if err != nil { - c.sm.Unlock() - klog.Errorf("failed to create sender: %w", err) - return + return fmt.Errorf("failed to create sender: %w", err) } err = c.receiver.InitPeer(clusterID, updateCallback) if err != nil { - c.sm.Unlock() - klog.Errorf("failed to add redirect chan: %w", err) + return fmt.Errorf("failed to init peer: %w", err) } - klog.Infof("conncheck sender %q starting against %q", clusterID, ip) - pingCallback := func(ctx context.Context) (done bool, err error) { + klog.Infof("conncheck sender %q added", clusterID, ip) + return nil +} + +// RunSender runs a sender. +func (c *ConnChecker) RunSender(clusterID string) { + sender, err := c.setRunning(clusterID) + if err != nil { + klog.Errorf("conncheck sender %s doesn't start for an error: %s", clusterID, err) + return + } + + klog.Infof("conncheck sender %q starting against %q", clusterID, sender.raddr.IP.String()) + + if err := wait.PollUntilContextCancel(sender.Ctx, PingInterval, false, func(ctx context.Context) (done bool, err error) { err = c.senders[clusterID].SendPing() if err != nil { klog.Warningf("failed to send ping: %s", err) } return false, nil - } - c.sm.Unlock() - - if err := wait.PollUntilContextCancel(ctxSender, PingInterval, true, pingCallback); err != nil { + }); err != nil { klog.Errorf("conncheck sender %s stopped for an error: %s", clusterID, err) } @@ -117,6 +130,8 @@ func (c *ConnChecker) DelAndStopSender(clusterID string) { c.senders[clusterID].cancel() delete(c.senders, clusterID) } + + delete(c.runningSenders, clusterID) delete(c.receiver.peers, clusterID) } @@ -139,3 +154,18 @@ func (c *ConnChecker) GetConnected(clusterID string) (bool, error) { } return false, fmt.Errorf("sender %s not found", clusterID) } + +func (c *ConnChecker) setRunning(clusterID string) (*Sender, error) { + c.sm.Lock() + defer c.sm.Unlock() + sender, ok := c.senders[clusterID] + if !ok { + return nil, fmt.Errorf("sender %s not found", clusterID) + } + + if _, ok := c.runningSenders[clusterID]; ok { + return nil, fmt.Errorf("sender %s already running", clusterID) + } + c.runningSenders[clusterID] = sender + return sender, nil +} diff --git a/pkg/liqonet/conncheck/consts.go b/pkg/gateway/connection/conncheck/consts.go similarity index 88% rename from pkg/liqonet/conncheck/consts.go rename to pkg/gateway/connection/conncheck/consts.go index 5d54012e4b..a2a2a03a0c 100644 --- a/pkg/liqonet/conncheck/consts.go +++ b/pkg/gateway/connection/conncheck/consts.go @@ -26,4 +26,6 @@ var ( PingLossThreshold uint // PingInterval is the interval at which the ping is sent. PingInterval time.Duration + // PingUpdateStatusInterval is the interval at which the status is updated. + PingUpdateStatusInterval time.Duration ) diff --git a/pkg/liqonet/conncheck/doc.go b/pkg/gateway/connection/conncheck/doc.go similarity index 100% rename from pkg/liqonet/conncheck/doc.go rename to pkg/gateway/connection/conncheck/doc.go diff --git a/pkg/gateway/connection/conncheck/error.go b/pkg/gateway/connection/conncheck/error.go new file mode 100644 index 0000000000..28ff848b80 --- /dev/null +++ b/pkg/gateway/connection/conncheck/error.go @@ -0,0 +1,33 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package conncheck + +import "fmt" + +// DuplicateError is an error type for ConnChecker. +// It is returned when an already present sender is added. +type DuplicateError struct { + Err error +} + +// NewDuplicateError returns a new DuplicateError. +func NewDuplicateError(clusterID string) *DuplicateError { + return &DuplicateError{Err: fmt.Errorf("sender %s already added", clusterID)} +} + +// Error returns the error message. +func (e DuplicateError) Error() string { + return e.Err.Error() +} diff --git a/pkg/liqonet/conncheck/receiver.go b/pkg/gateway/connection/conncheck/receiver.go similarity index 100% rename from pkg/liqonet/conncheck/receiver.go rename to pkg/gateway/connection/conncheck/receiver.go diff --git a/pkg/liqonet/conncheck/sender.go b/pkg/gateway/connection/conncheck/sender.go similarity index 90% rename from pkg/liqonet/conncheck/sender.go rename to pkg/gateway/connection/conncheck/sender.go index eedbf22e43..dd63ab92f0 100644 --- a/pkg/liqonet/conncheck/sender.go +++ b/pkg/gateway/connection/conncheck/sender.go @@ -15,6 +15,7 @@ package conncheck import ( + "context" "encoding/json" "fmt" "net" @@ -25,6 +26,7 @@ import ( // Sender is a sender for the conncheck server. type Sender struct { + Ctx context.Context clusterID string cancel func() conn *net.UDPConn @@ -32,12 +34,13 @@ type Sender struct { } // NewSender creates a new conncheck sender. -func NewSender(clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) { +func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) { pip := net.ParseIP(ip) if pip == nil { return nil, fmt.Errorf("conncheck sender: invalid IP address %s", ip) } return &Sender{ + Ctx: ctx, clusterID: clusterID, cancel: cancel, conn: conn, diff --git a/pkg/gateway/connection/connections_controller.go b/pkg/gateway/connection/connections_controller.go new file mode 100644 index 0000000000..50b3b0a8c8 --- /dev/null +++ b/pkg/gateway/connection/connections_controller.go @@ -0,0 +1,138 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "context" + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" + "github.com/liqotech/liqo/pkg/consts" + "github.com/liqotech/liqo/pkg/gateway" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" + "github.com/liqotech/liqo/pkg/gateway/tunnel/common" +) + +// cluster-role +// +kubebuilder:rbac:groups=networking.liqo.io,resources=connections,verbs=get;list;create;delete;update;watch +// +kubebuilder:rbac:groups=networking.liqo.io,resources=connections/status,verbs=get;update;patch + +// ConnectionsReconciler updates the PublicKey resource used to establish the Wireguard connection. +type ConnectionsReconciler struct { + ConnChecker *conncheck.ConnChecker + Client client.Client + Scheme *runtime.Scheme + EventsRecorder record.EventRecorder + Options *gateway.Options +} + +// NewConnectionsReconciler returns a new PublicKeysReconciler. +func NewConnectionsReconciler(ctx context.Context, cl client.Client, + s *runtime.Scheme, er record.EventRecorder, options *gateway.Options) (*ConnectionsReconciler, error) { + connchecker, err := conncheck.NewConnChecker() + if err != nil { + return nil, fmt.Errorf("unable to create the connection checker: %w", err) + } + go connchecker.RunReceiver(ctx) + go connchecker.RunReceiverDisconnectObserver(ctx) + return &ConnectionsReconciler{ + ConnChecker: connchecker, + Client: cl, + Scheme: s, + EventsRecorder: er, + Options: options, + }, nil +} + +// Reconcile manage PublicKey resources. +func (r *ConnectionsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + connection := &networkingv1alpha1.Connection{} + if err := r.Client.Get(ctx, req.NamespacedName, connection); err != nil { + if apierrors.IsNotFound(err) { + klog.Infof("There is no connection %s", req.String()) + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("unable to get the connection %q: %w", req.NamespacedName, err) + } + + remoteIP, err := common.GetRemoteInterfaceIP(r.Options.Mode) + if err != nil { + return ctrl.Result{}, fmt.Errorf("unable to get the remote interface IP: %w", err) + } + + err = r.ConnChecker.AddSender(ctx, r.Options.RemoteClusterID, remoteIP, ForgeUpdateConnectionCallback(ctx, r.Client, req)) + if err != nil { + switch err.(type) { + case *conncheck.DuplicateError: + return ctrl.Result{}, nil + default: + return ctrl.Result{}, fmt.Errorf("unable to add the sender: %w", err) + } + } + + go r.ConnChecker.RunSender(r.Options.RemoteClusterID) + + return ctrl.Result{}, nil +} + +// SetupWithManager register the ConfigurationReconciler to the manager. +func (r *ConnectionsReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&networkingv1alpha1.Connection{}, r.Predicates()). + Complete(r) +} + +// Predicates returns the predicates required for the PublicKey controller. +func (r *ConnectionsReconciler) Predicates() builder.Predicates { + return builder.WithPredicates( + predicate.NewPredicateFuncs(func(object client.Object) bool { + connection, ok := object.(*networkingv1alpha1.Connection) + if !ok { + return false + } + if connection.Labels == nil { + return false + } + return connection.Labels[string(consts.RemoteClusterID)] == r.Options.RemoteClusterID + })) +} + +// ForgeUpdateConnectionCallback forges the UpdateConnectionStatus function. +func ForgeUpdateConnectionCallback(ctx context.Context, cl client.Client, req ctrl.Request) conncheck.UpdateFunc { + return func(connected bool, latency time.Duration, timestamp time.Time) error { + connection := &networkingv1alpha1.Connection{} + if err := cl.Get(ctx, req.NamespacedName, connection); err != nil { + return err + } + var connStatusValue networkingv1alpha1.ConnectionStatusValue + switch connected { + case true: + connStatusValue = networkingv1alpha1.Connected + case false: + connStatusValue = networkingv1alpha1.ConnectionError + } + return UpdateConnectionStatus(ctx, cl, connection, connStatusValue, latency, timestamp) + } +} diff --git a/pkg/gateway/connection/doc.go b/pkg/gateway/connection/doc.go new file mode 100644 index 0000000000..c27703910c --- /dev/null +++ b/pkg/gateway/connection/doc.go @@ -0,0 +1,16 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package connection menages the connection resource. +package connection diff --git a/pkg/gateway/connection/flags.go b/pkg/gateway/connection/flags.go new file mode 100644 index 0000000000..29ee8e67d7 --- /dev/null +++ b/pkg/gateway/connection/flags.go @@ -0,0 +1,66 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "time" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" +) + +// FlagName is the type for the name of the flags. +type FlagName string + +func (fn FlagName) String() string { + return string(fn) +} + +const ( + // PingLossThresholdFlag is the name of the flag used to set the ping loss threshold. + PingLossThresholdFlag FlagName = "ping-loss-threshold" + // PingIntervalFlag is the name of the flag used to set the ping interval. + PingIntervalFlag FlagName = "ping-interval" + // PingUpdateStatusIntervalFlag is the name of the flag used to set the ping update status interval. + PingUpdateStatusIntervalFlag FlagName = "ping-update-status-interval" +) + +// RequiredFlags contains the list of the mandatory flags. +var RequiredFlags = []FlagName{ + PingLossThresholdFlag, + PingIntervalFlag, +} + +// InitFlags initializes the flags for the wireguard tunnel. +func InitFlags(flagset *pflag.FlagSet) { + flagset.UintVar(&conncheck.PingLossThreshold, PingLossThresholdFlag.String(), 5, + "ping-loss-threshold is the number of lost packets after which the connection check is considered as failed.") + flagset.DurationVar(&conncheck.PingInterval, PingIntervalFlag.String(), 2*time.Second, + "ping-interval is the interval between two connection checks") + flagset.DurationVar(&conncheck.PingUpdateStatusInterval, PingUpdateStatusIntervalFlag.String(), 10*time.Second, + "ping-update-status-interval is the interval at which the status is updated") +} + +// MarkFlagsRequired marks the flags as required. +func MarkFlagsRequired(cmd *cobra.Command) error { + for _, flag := range RequiredFlags { + if err := cmd.MarkFlagRequired(flag.String()); err != nil { + return err + } + } + return nil +} diff --git a/pkg/gateway/connection/k8s.go b/pkg/gateway/connection/k8s.go new file mode 100644 index 0000000000..ece23ab6fe --- /dev/null +++ b/pkg/gateway/connection/k8s.go @@ -0,0 +1,51 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "context" + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" + timeutils "github.com/liqotech/liqo/pkg/utils/time" +) + +// UpdateConnectionStatus updates the status of a connection. +func UpdateConnectionStatus(ctx context.Context, cl client.Client, connection *networkingv1alpha1.Connection, + value networkingv1alpha1.ConnectionStatusValue, latency time.Duration, timestamp time.Time) error { + if connection.Status.Value != value || + timestamp.Sub(connection.Status.Latency.Timestamp.Time) > conncheck.PingUpdateStatusInterval { + if connection.Status.Value != value { + klog.Infof("changing connection %q status to %q", + client.ObjectKeyFromObject(connection).String(), value) + } + connection.Status.Latency = networkingv1alpha1.ConnectionLatency{ + Value: timeutils.FormatLatency(latency), + Timestamp: metav1.NewTime(timestamp), + } + connection.Status.Value = value + if err := cl.Status().Update(ctx, connection); err != nil { + return fmt.Errorf("unable to update connection %q: %w", + client.ObjectKeyFromObject(connection).String(), err) + } + } + return nil +} diff --git a/pkg/gateway/doc.go b/pkg/gateway/doc.go new file mode 100644 index 0000000000..791e3cf81f --- /dev/null +++ b/pkg/gateway/doc.go @@ -0,0 +1,16 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package gateway contains the gateway controllers and utilities. +package gateway diff --git a/pkg/gateway/flags.go b/pkg/gateway/flags.go new file mode 100644 index 0000000000..2e91467049 --- /dev/null +++ b/pkg/gateway/flags.go @@ -0,0 +1,93 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gateway + +import ( + "time" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" +) + +// FlagName is the type for the name of the flags. +type FlagName string + +func (fn FlagName) String() string { + return string(fn) +} + +const ( + // FlagNameName is the name of the WgGateway resource. + FlagNameName FlagName = "name" + // FlagNameNamespace is the namespace WgGateway resource. + FlagNameNamespace FlagName = "namespace" + // FlagNameRemoteClusterID is the clusterID of the remote cluster. + FlagNameRemoteClusterID FlagName = "remote-cluster-id" + + // FlagNameMode is the mode in which the wireguard interface is configured. + FlagNameMode FlagName = "mode" + + // FlagNameLeaderElection is the flag to enable leader election. + FlagNameLeaderElection FlagName = "leader-election" + // FlagNameLeaderElectionLeaseDuration is the lease duration for the leader election. + FlagNameLeaderElectionLeaseDuration FlagName = "leader-election-lease-duration" + // FlagNameLeaderElectionRenewDeadline is the renew deadline for the leader election. + FlagNameLeaderElectionRenewDeadline FlagName = "leader-election-renew-deadline" + // FlagNameLeaderElectionRetryPeriod is the retry period for the leader election. + FlagNameLeaderElectionRetryPeriod FlagName = "leader-election-retry-period" + + // FlagNameMetricsAddress is the address for the metrics endpoint. + FlagNameMetricsAddress FlagName = "metrics-address" + // FlagNameProbeAddr is the address for the health probe endpoint. + FlagNameProbeAddr FlagName = "health-probe-bind-address" +) + +// RequiredFlags contains the list of the mandatory flags. +var RequiredFlags = []FlagName{ + FlagNameName, + FlagNameNamespace, + FlagNameRemoteClusterID, + FlagNameMode, +} + +// InitFlags initializes the flags for the wireguard tunnel. +func InitFlags(flagset *pflag.FlagSet, opts *Options) { + flagset.StringVar(&opts.Name, FlagNameName.String(), "", "Parent gateway name") + flagset.StringVar(&opts.Namespace, FlagNameNamespace.String(), "", "Parent gateway namespace") + flagset.StringVar(&opts.RemoteClusterID, FlagNameRemoteClusterID.String(), "", "ClusterID of the remote cluster") + + flagset.Var(&opts.Mode, FlagNameMode.String(), "Parent gateway mode") + + flagset.BoolVar(&opts.LeaderElection, FlagNameLeaderElection.String(), false, "Enable leader election") + flagset.DurationVar(&opts.LeaderElectionLeaseDuration, FlagNameLeaderElectionLeaseDuration.String(), 15*time.Second, + "LeaseDuration for the leader election") + flagset.DurationVar(&opts.LeaderElectionRenewDeadline, FlagNameLeaderElectionRenewDeadline.String(), 10*time.Second, + "RenewDeadline for the leader election") + flagset.DurationVar(&opts.LeaderElectionRetryPeriod, FlagNameLeaderElectionRetryPeriod.String(), 2*time.Second, + "RetryPeriod for the leader election") + + flagset.StringVar(&opts.MetricsAddress, FlagNameMetricsAddress.String(), ":8080", "Address for the metrics endpoint") + flagset.StringVar(&opts.ProbeAddr, FlagNameProbeAddr.String(), ":8081", "Address for the health probe endpoint") +} + +// MarkFlagsRequired marks the flags as required. +func MarkFlagsRequired(cmd *cobra.Command) error { + for _, flag := range RequiredFlags { + if err := cmd.MarkFlagRequired(flag.String()); err != nil { + return err + } + } + return nil +} diff --git a/pkg/gateway/tunnel/common/options.go b/pkg/gateway/options.go similarity index 66% rename from pkg/gateway/tunnel/common/options.go rename to pkg/gateway/options.go index 1fef1d84ef..8f0c9a9f21 100644 --- a/pkg/gateway/tunnel/common/options.go +++ b/pkg/gateway/options.go @@ -12,12 +12,35 @@ // See the License for the specific language governing permissions and // limitations under the License. -package common +package gateway import ( "fmt" + "time" ) +// Options contains the options for the wireguard interface. +type Options struct { + Name string + Namespace string + RemoteClusterID string + + Mode Mode + + LeaderElection bool + LeaderElectionLeaseDuration time.Duration + LeaderElectionRenewDeadline time.Duration + LeaderElectionRetryPeriod time.Duration + + MetricsAddress string + ProbeAddr string +} + +// NewOptions returns a new Options struct. +func NewOptions() *Options { + return &Options{} +} + // Mode is the mode in which the wireguard interface is configured. type Mode string @@ -49,3 +72,9 @@ func (m *Mode) Set(value string) error { func (m *Mode) Type() string { return "string" } + +// GenerateResourceName generates the name used for the resources created by the gateway. +// This will help if a suffix will be added to the name of the resources in future. +func GenerateResourceName(name string) string { + return name +} diff --git a/pkg/gateway/tunnel/common/netlink.go b/pkg/gateway/tunnel/common/netlink.go index f784928059..e72d0546f9 100644 --- a/pkg/gateway/tunnel/common/netlink.go +++ b/pkg/gateway/tunnel/common/netlink.go @@ -14,7 +14,20 @@ package common -import "github.com/vishvananda/netlink" +import ( + "fmt" + + "github.com/vishvananda/netlink" + + "github.com/liqotech/liqo/pkg/gateway" +) + +const ( + // ServerInterfaceIP is the IP address of the Wireguard interface in server mode. + ServerInterfaceIP = "169.254.0.1/30" + // ClientInterfaceIP is the IP address of the Wireguard interface in client mode. + ClientInterfaceIP = "169.254.0.2/30" +) // AddAddress adds an IP address to the Wireguard interface. func AddAddress(link netlink.Link, ip string) error { @@ -30,3 +43,27 @@ func AddAddress(link netlink.Link, ip string) error { func GetLink(name string) (netlink.Link, error) { return netlink.LinkByName(name) } + +// GetInterfaceIP returns the IP address of the Wireguard interface. +func GetInterfaceIP(mode gateway.Mode) string { + switch mode { + case gateway.ModeServer: + return ServerInterfaceIP + case gateway.ModeClient: + return ClientInterfaceIP + } + return "" +} + +// GetRemoteInterfaceIP returns the IP address of the remote Wireguard interface. +func GetRemoteInterfaceIP(mode gateway.Mode) (string, error) { + switch mode { + case gateway.ModeServer: + ip, err := netlink.ParseIPNet(ClientInterfaceIP) + return ip.IP.String(), err + case gateway.ModeClient: + ip, err := netlink.ParseIPNet(ServerInterfaceIP) + return ip.IP.String(), err + } + return "", fmt.Errorf("invalid mode %v", mode) +} diff --git a/pkg/gateway/tunnel/wireguard/device.go b/pkg/gateway/tunnel/wireguard/device.go index 10ef54bf5b..853e87e313 100644 --- a/pkg/gateway/tunnel/wireguard/device.go +++ b/pkg/gateway/tunnel/wireguard/device.go @@ -20,7 +20,7 @@ import ( "golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - "github.com/liqotech/liqo/pkg/gateway/tunnel/common" + "github.com/liqotech/liqo/pkg/gateway" ) func configureDevice(wgcl *wgctrl.Client, options *Options, peerPubKey wgtypes.Key) error { @@ -36,10 +36,10 @@ func configureDevice(wgcl *wgctrl.Client, options *Options, peerPubKey wgtypes.K ReplacePeers: true, } - switch options.Mode { - case common.ModeServer: + switch options.GwOptions.Mode { + case gateway.ModeServer: confdev.ListenPort = &options.ListenPort - case common.ModeClient: + case gateway.ModeClient: confdev.Peers[0].Endpoint = &net.UDPAddr{ IP: options.EndpointIP, Port: options.EndpointPort, diff --git a/pkg/gateway/tunnel/wireguard/dns.go b/pkg/gateway/tunnel/wireguard/dns.go index 840e20007d..43d1c69f71 100644 --- a/pkg/gateway/tunnel/wireguard/dns.go +++ b/pkg/gateway/tunnel/wireguard/dns.go @@ -31,7 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" "github.com/liqotech/liqo/pkg/consts" - "github.com/liqotech/liqo/pkg/gateway/tunnel/common" + "github.com/liqotech/liqo/pkg/gateway" "github.com/liqotech/liqo/pkg/utils/getters" ) @@ -88,7 +88,7 @@ func StartDNSRoutine(ctx context.Context, ch chan event.GenericEvent, opts *Opti // IsDNSRoutineRequired checks if the client endpoint is a DNS. // If it is a DNS the DNS routine is required. func IsDNSRoutineRequired(opts *Options) bool { - if opts.Mode != common.ModeClient { + if opts.GwOptions.Mode != gateway.ModeClient { return false } return net.ParseIP(opts.EndpointAddress) == nil @@ -106,10 +106,10 @@ func NewDNSEventHandler(cl client.Client, opts *Options) handler.EventHandler { return handler.EnqueueRequestsFromMapFunc( func(ctx context.Context, _ client.Object) []reconcile.Request { labelSet := labels.Set{ - string(LabelsMode): string(opts.Mode), - string(consts.RemoteClusterID): opts.RemoteClusterID, + string(LabelsMode): string(opts.GwOptions.Mode), + string(consts.RemoteClusterID): opts.GwOptions.RemoteClusterID, } - list, err := getters.ListPublicKeysByLabel(ctx, cl, opts.Namespace, labels.SelectorFromSet(labelSet)) + list, err := getters.ListPublicKeysByLabel(ctx, cl, opts.GwOptions.Namespace, labels.SelectorFromSet(labelSet)) if err != nil { klog.Error(err) } diff --git a/pkg/gateway/tunnel/wireguard/flags.go b/pkg/gateway/tunnel/wireguard/flags.go index d97ff22452..622c14a925 100644 --- a/pkg/gateway/tunnel/wireguard/flags.go +++ b/pkg/gateway/tunnel/wireguard/flags.go @@ -20,7 +20,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" - "github.com/liqotech/liqo/pkg/gateway/tunnel/common" + "github.com/liqotech/liqo/pkg/gateway" ) // FlagName is the type for the name of the flags. @@ -31,17 +31,9 @@ func (fn FlagName) String() string { } const ( - // FlagNameName is the name of the WgGateway resource. - FlagNameName FlagName = "name" - // FlagNameNamespace is the namespace WgGateway resource. - FlagNameNamespace FlagName = "namespace" - // FlagNameRemoteClusterID is the clusterID of the remote cluster. - FlagNameRemoteClusterID FlagName = "remote-cluster-id" // FlagNameGatewayUID is the UID of the wireguard gateway. FlagNameGatewayUID FlagName = "gateway-uid" - // FlagNameMode is the mode in which the wireguard interface is configured. - FlagNameMode FlagName = "mode" // FlagNameMTU is the MTU for the wireguard interface. FlagNameMTU FlagName = "mtu" // FlagNameListenPort is the listen port for the wireguard interface. @@ -57,29 +49,11 @@ const ( // FlagNameDNSCheckInterval is the interval between two DNS checks. FlagNameDNSCheckInterval FlagName = "dns-check-interval" - - // FlagNameLeaderElection is the flag to enable leader election. - FlagNameLeaderElection FlagName = "leader-election" - // FlagNameLeaderElectionLeaseDuration is the lease duration for the leader election. - FlagNameLeaderElectionLeaseDuration FlagName = "leader-election-lease-duration" - // FlagNameLeaderElectionRenewDeadline is the renew deadline for the leader election. - FlagNameLeaderElectionRenewDeadline FlagName = "leader-election-renew-deadline" - // FlagNameLeaderElectionRetryPeriod is the retry period for the leader election. - FlagNameLeaderElectionRetryPeriod FlagName = "leader-election-retry-period" - - // FlagNameMetricsAddress is the address for the metrics endpoint. - FlagNameMetricsAddress FlagName = "metrics-address" - // FlagNameProbeAddr is the address for the health probe endpoint. - FlagNameProbeAddr FlagName = "health-probe-bind-address" ) // RequiredFlags contains the list of the mandatory flags. var RequiredFlags = []FlagName{ - FlagNameName, - FlagNameNamespace, - FlagNameRemoteClusterID, FlagNameGatewayUID, - FlagNameMode, } // ClientRequiredFlags contains the list of the mandatory flags for the client mode. @@ -89,12 +63,8 @@ var ClientRequiredFlags = []FlagName{ // InitFlags initializes the flags for the wireguard tunnel. func InitFlags(flagset *pflag.FlagSet, opts *Options) { - flagset.StringVar(&opts.Name, FlagNameName.String(), "", "Parent gateway name") - flagset.StringVar(&opts.Namespace, FlagNameNamespace.String(), "", "Parent gateway namespace") - flagset.StringVar(&opts.RemoteClusterID, FlagNameRemoteClusterID.String(), "", "ClusterID of the remote cluster") flagset.StringVar(&opts.GatewayUID, FlagNameGatewayUID.String(), "", "Parent gateway resource UID") - flagset.Var(&opts.Mode, FlagNameMode.String(), "Parent gateway mode") flagset.IntVar(&opts.MTU, FlagNameMTU.String(), 1420, "MTU for the interface") flagset.StringVar(&opts.InterfaceName, FlagNameInterfaceName.String(), "liqo-tunnel", "Name for the tunnel interface") flagset.IntVar(&opts.ListenPort, FlagNameListenPort.String(), 51820, "Listen port (server only)") @@ -102,17 +72,6 @@ func InitFlags(flagset *pflag.FlagSet, opts *Options) { flagset.IntVar(&opts.EndpointPort, FlagNameEndpointPort.String(), 51820, "Endpoint port (client only)") flagset.DurationVar(&opts.DNSCheckInterval, FlagNameDNSCheckInterval.String(), 5*time.Minute, "Interval between two DNS checks") - - flagset.BoolVar(&opts.LeaderElection, FlagNameLeaderElection.String(), false, "Enable leader election") - flagset.DurationVar(&opts.LeaderElectionLeaseDuration, FlagNameLeaderElectionLeaseDuration.String(), 15*time.Second, - "LeaseDuration for the leader election") - flagset.DurationVar(&opts.LeaderElectionRenewDeadline, FlagNameLeaderElectionRenewDeadline.String(), 10*time.Second, - "RenewDeadline for the leader election") - flagset.DurationVar(&opts.LeaderElectionRetryPeriod, FlagNameLeaderElectionRetryPeriod.String(), 2*time.Second, - "RetryPeriod for the leader election") - - flagset.StringVar(&opts.MetricsAddress, FlagNameMetricsAddress.String(), ":8080", "Address for the metrics endpoint") - flagset.StringVar(&opts.ProbeAddr, FlagNameProbeAddr.String(), ":8081", "Address for the health probe endpoint") } // MarkFlagsRequired marks the flags as required. @@ -122,7 +81,7 @@ func MarkFlagsRequired(cmd *cobra.Command, opts *Options) error { return err } } - if opts.Mode == common.ModeClient { + if opts.GwOptions.Mode == gateway.ModeClient { for _, flag := range ClientRequiredFlags { if err := cmd.MarkFlagRequired(flag.String()); err != nil { return err diff --git a/pkg/gateway/tunnel/wireguard/k8s.go b/pkg/gateway/tunnel/wireguard/k8s.go index 54a121ef87..52ba2af561 100644 --- a/pkg/gateway/tunnel/wireguard/k8s.go +++ b/pkg/gateway/tunnel/wireguard/k8s.go @@ -28,29 +28,32 @@ import ( networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" "github.com/liqotech/liqo/pkg/consts" - "github.com/liqotech/liqo/pkg/gateway/tunnel/common" + "github.com/liqotech/liqo/pkg/gateway" ) // SetOwnerReferenceWithMode sets the owner reference of the object according to the mode. func SetOwnerReferenceWithMode(opts *Options, obj metav1.Object, scheme *runtime.Scheme) error { meta := metav1.ObjectMeta{ - Name: opts.Name, - Namespace: opts.Namespace, + Name: opts.GwOptions.Name, + Namespace: opts.GwOptions.Namespace, UID: types.UID(opts.GatewayUID), } - switch opts.Mode { - case common.ModeServer: + switch opts.GwOptions.Mode { + case gateway.ModeServer: return controllerutil.SetOwnerReference(&networkingv1alpha1.GatewayServer{ObjectMeta: meta}, obj, scheme) - case common.ModeClient: + case gateway.ModeClient: return controllerutil.SetOwnerReference(&networkingv1alpha1.GatewayClient{ObjectMeta: meta}, obj, scheme) } - return fmt.Errorf("invalid mode %v", opts.Mode) + return fmt.Errorf("invalid mode %v", opts.GwOptions.Mode) } // CheckKeysSecret checks if the keys secret exists and if it contains the private and public keys. func CheckKeysSecret(ctx context.Context, cl client.Client, opts *Options) (wgtypes.Key, error) { secret := &corev1.Secret{} - if err := cl.Get(ctx, types.NamespacedName{Name: GenerateResourceName(opts.Name), Namespace: opts.Namespace}, secret); err != nil { + if err := cl.Get(ctx, types.NamespacedName{ + Name: gateway.GenerateResourceName(opts.GwOptions.Name), + Namespace: opts.GwOptions.Namespace, + }, secret); err != nil { return wgtypes.Key{}, err } if secret.Data == nil { @@ -69,14 +72,14 @@ func CheckKeysSecret(ctx context.Context, cl client.Client, opts *Options) (wgty func CreateKeysSecret(ctx context.Context, cl client.Client, opts *Options, pri, pub wgtypes.Key) error { secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ - Name: GenerateResourceName(opts.Name), - Namespace: opts.Namespace, + Name: gateway.GenerateResourceName(opts.GwOptions.Name), + Namespace: opts.GwOptions.Namespace, }, } if _, err := controllerutil.CreateOrUpdate(ctx, cl, secret, func() error { secret.SetLabels(map[string]string{ - string(consts.RemoteClusterID): opts.RemoteClusterID, + string(consts.RemoteClusterID): opts.GwOptions.RemoteClusterID, string(consts.GatewayResourceLabel): string(consts.GatewayResourceLabelValue), }) if err := SetOwnerReferenceWithMode(opts, secret, cl.Scheme()); err != nil { @@ -97,26 +100,34 @@ func CreateKeysSecret(ctx context.Context, cl client.Client, opts *Options, pri, // EnsureConnection creates or updates the connection resource. func EnsureConnection(ctx context.Context, cl client.Client, scheme *runtime.Scheme, opts *Options) error { conn := &networkingv1alpha1.Connection{ObjectMeta: metav1.ObjectMeta{ - Name: GenerateResourceName(opts.Name), Namespace: opts.Namespace, + Name: gateway.GenerateResourceName(opts.GwOptions.Name), Namespace: opts.GwOptions.Namespace, + Labels: map[string]string{ + string(consts.RemoteClusterID): opts.GwOptions.RemoteClusterID, + }, }} _, err := controllerutil.CreateOrUpdate(ctx, cl, conn, func() error { if err := SetOwnerReferenceWithMode(opts, conn, scheme); err != nil { return err } conn.Spec.GatewayRef.APIVersion = networkingv1alpha1.GroupVersion.String() - conn.Spec.GatewayRef.Name = opts.Name - conn.Spec.GatewayRef.Namespace = opts.Namespace + conn.Spec.GatewayRef.Name = opts.GwOptions.Name + conn.Spec.GatewayRef.Namespace = opts.GwOptions.Namespace conn.Spec.GatewayRef.UID = types.UID(opts.GatewayUID) - switch opts.Mode { - case common.ModeServer: + switch opts.GwOptions.Mode { + case gateway.ModeServer: conn.Spec.Type = networkingv1alpha1.ConnectionTypeServer conn.Spec.GatewayRef.Kind = networkingv1alpha1.WgGatewayServerKind - case common.ModeClient: + case gateway.ModeClient: conn.Spec.Type = networkingv1alpha1.ConnectionTypeClient conn.Spec.GatewayRef.Kind = networkingv1alpha1.WgGatewayClientKind } return nil }) - return err + if err != nil { + return err + } + + conn.Status.Value = networkingv1alpha1.Connecting + return cl.Status().Update(ctx, conn) } diff --git a/pkg/gateway/tunnel/wireguard/labels.go b/pkg/gateway/tunnel/wireguard/labels.go index dc5b402886..34039c66c8 100644 --- a/pkg/gateway/tunnel/wireguard/labels.go +++ b/pkg/gateway/tunnel/wireguard/labels.go @@ -14,7 +14,7 @@ package wireguard -import "github.com/liqotech/liqo/pkg/gateway/tunnel/common" +import "github.com/liqotech/liqo/pkg/gateway" // Labels is the type used to identify the wireguard labels. type Labels string @@ -29,7 +29,7 @@ const ( const ( // LabelsModeServer is the label used to identify the wireguard mode server. - LabelsModeServer LabelsValue = LabelsValue(common.ModeServer) + LabelsModeServer LabelsValue = LabelsValue(gateway.ModeServer) // LabelsModeClient is the label used to identify the wireguard mode client. - LabelsModeClient LabelsValue = LabelsValue(common.ModeClient) + LabelsModeClient LabelsValue = LabelsValue(gateway.ModeClient) ) diff --git a/pkg/gateway/tunnel/wireguard/netlink.go b/pkg/gateway/tunnel/wireguard/netlink.go index b4070987f4..a69afb9119 100644 --- a/pkg/gateway/tunnel/wireguard/netlink.go +++ b/pkg/gateway/tunnel/wireguard/netlink.go @@ -20,16 +20,10 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "k8s.io/klog/v2" + "github.com/liqotech/liqo/pkg/gateway" "github.com/liqotech/liqo/pkg/gateway/tunnel/common" ) -const ( - // ServerInterfaceIP is the IP address of the Wireguard interface in server mode. - ServerInterfaceIP = "169.254.0.1/30" - // ClientInterfaceIP is the IP address of the Wireguard interface in client mode. - ClientInterfaceIP = "169.254.0.2/30" -) - // InitWireguardLink inits the Wireguard interface. func InitWireguardLink(options *Options) error { if err := createLink(options); err != nil { @@ -41,25 +35,14 @@ func InitWireguardLink(options *Options) error { return err } - klog.Infof("Setting up Wireguard interface %q with IP %q", options.InterfaceName, GetInterfaceIP(options.Mode)) - if err := common.AddAddress(link, GetInterfaceIP(options.Mode)); err != nil { + klog.Infof("Setting up Wireguard interface %q with IP %q", options.InterfaceName, common.GetInterfaceIP(options.GwOptions.Mode)) + if err := common.AddAddress(link, common.GetInterfaceIP(options.GwOptions.Mode)); err != nil { return err } return netlink.LinkSetUp(link) } -// GetInterfaceIP returns the IP address of the Wireguard interface. -func GetInterfaceIP(mode common.Mode) string { - switch mode { - case common.ModeServer: - return ServerInterfaceIP - case common.ModeClient: - return ClientInterfaceIP - } - return "" -} - // CreateLink creates a new Wireguard interface. func createLink(options *Options) error { link := netlink.Wireguard{ @@ -74,7 +57,7 @@ func createLink(options *Options) error { return err } - if options.Mode == common.ModeServer { + if options.GwOptions.Mode == gateway.ModeServer { wgcl, err := wgctrl.New() if err != nil { return err diff --git a/pkg/gateway/tunnel/wireguard/options.go b/pkg/gateway/tunnel/wireguard/options.go index 669cc3c317..17c379b5ec 100644 --- a/pkg/gateway/tunnel/wireguard/options.go +++ b/pkg/gateway/tunnel/wireguard/options.go @@ -21,17 +21,15 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - "github.com/liqotech/liqo/pkg/gateway/tunnel/common" + "github.com/liqotech/liqo/pkg/gateway" ) // Options contains the options for the wireguard interface. type Options struct { - Name string - Namespace string - RemoteClusterID string - GatewayUID string + GwOptions *gateway.Options + + GatewayUID string - Mode common.Mode MTU int PrivateKey wgtypes.Key InterfaceName string @@ -44,25 +42,12 @@ type Options struct { EndpointIPMutex *sync.Mutex DNSCheckInterval time.Duration - - LeaderElection bool - LeaderElectionLeaseDuration time.Duration - LeaderElectionRenewDeadline time.Duration - LeaderElectionRetryPeriod time.Duration - - MetricsAddress string - ProbeAddr string } // NewOptions returns a new Options struct. -func NewOptions() *Options { +func NewOptions(options *gateway.Options) *Options { return &Options{ + GwOptions: options, EndpointIPMutex: &sync.Mutex{}, } } - -// GenerateResourceName generates the name used for the resources created by the gateway. -// This will help if a suffix will be added to the name of the resources in future. -func GenerateResourceName(name string) string { - return name -} diff --git a/pkg/gateway/tunnel/wireguard/publickeys-controller.go b/pkg/gateway/tunnel/wireguard/publickeys_controller.go similarity index 96% rename from pkg/gateway/tunnel/wireguard/publickeys-controller.go rename to pkg/gateway/tunnel/wireguard/publickeys_controller.go index 17184430e7..c1cfc8041b 100644 --- a/pkg/gateway/tunnel/wireguard/publickeys-controller.go +++ b/pkg/gateway/tunnel/wireguard/publickeys_controller.go @@ -35,7 +35,7 @@ import ( ) // cluster-role -// +kubebuilder:rbac:groups=networking.liqo.io,resources=publickeys,verbs=get;list;create;delete;update +// +kubebuilder:rbac:groups=networking.liqo.io,resources=publickeies,verbs=get;list;create;delete;update;watch // PublicKeysReconciler updates the PublicKey resource used to establish the Wireguard connection. type PublicKeysReconciler struct { @@ -95,13 +95,13 @@ func (r *PublicKeysReconciler) Predicates() builder.Predicates { if !ok { return false } - if mode != string(r.Options.Mode) { + if mode != string(r.Options.GwOptions.Mode) { return false } id, ok := object.GetLabels()[string(consts.RemoteClusterID)] if !ok { return false } - return id == r.Options.RemoteClusterID + return id == r.Options.GwOptions.RemoteClusterID })) } diff --git a/pkg/liqo-controller-manager/external-network/configuration-controller/configuration-controller.go b/pkg/liqo-controller-manager/external-network/configuration-controller/configuration_controller.go similarity index 100% rename from pkg/liqo-controller-manager/external-network/configuration-controller/configuration-controller.go rename to pkg/liqo-controller-manager/external-network/configuration-controller/configuration_controller.go diff --git a/pkg/liqo-controller-manager/external-network/wireguard/utils.go b/pkg/liqo-controller-manager/external-network/wireguard/utils.go index 133ac53ee5..a9cab8ab33 100644 --- a/pkg/liqo-controller-manager/external-network/wireguard/utils.go +++ b/pkg/liqo-controller-manager/external-network/wireguard/utils.go @@ -28,7 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/liqotech/liqo/pkg/consts" - tunnel "github.com/liqotech/liqo/pkg/gateway/tunnel/wireguard" + "github.com/liqotech/liqo/pkg/gateway" liqolabels "github.com/liqotech/liqo/pkg/utils/labels" ) @@ -52,7 +52,7 @@ func wireGuardSecretEnquerer(_ context.Context, obj client.Object) []ctrl.Reques { NamespacedName: types.NamespacedName{ Namespace: secret.Namespace, - Name: tunnel.GenerateResourceName(secret.Name), + Name: gateway.GenerateResourceName(secret.Name), }, }, } diff --git a/pkg/liqonet/tunnel/driver.go b/pkg/liqonet/tunnel/driver.go index 9a9bc8b4e5..3a26dedea4 100644 --- a/pkg/liqonet/tunnel/driver.go +++ b/pkg/liqonet/tunnel/driver.go @@ -23,7 +23,7 @@ import ( "k8s.io/klog/v2" netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" - "github.com/liqotech/liqo/pkg/liqonet/conncheck" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" ) // DriverCreateFunc function prototype to create a new driver. diff --git a/pkg/liqonet/tunnel/wireguard/driver.go b/pkg/liqonet/tunnel/wireguard/driver.go index d582830ea7..252aa74b49 100644 --- a/pkg/liqonet/tunnel/wireguard/driver.go +++ b/pkg/liqonet/tunnel/wireguard/driver.go @@ -42,7 +42,7 @@ import ( discv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" liqoconst "github.com/liqotech/liqo/pkg/consts" - "github.com/liqotech/liqo/pkg/liqonet/conncheck" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" "github.com/liqotech/liqo/pkg/liqonet/tunnel" "github.com/liqotech/liqo/pkg/liqonet/tunnel/metrics" "github.com/liqotech/liqo/pkg/liqonet/tunnel/resolver" @@ -282,7 +282,11 @@ func (w *Wireguard) ConnectToEndpoint(ctx context.Context, tep *netv1alpha1.Tunn klog.Infof("%s -> starting conncheck sender", tep.Spec.ClusterIdentity) - go w.Connchecker.AddAndRunSender(ctx, tep.Spec.ClusterIdentity.ClusterID, pingIP, updateStatus) + if err := w.Connchecker.AddSender(ctx, tep.Spec.ClusterIdentity.ClusterID, pingIP, updateStatus); err != nil { + return nil, fmt.Errorf("failed to add conncheck sender for cluster %s: %w", tep.Spec.ClusterIdentity, err) + } + + go w.Connchecker.RunSender(tep.Spec.ClusterIdentity.ClusterID) klog.V(4).Infof("Done connecting cluster peer %s@%s", tep.Spec.ClusterIdentity, endpoint.String()) return c, nil diff --git a/pkg/utils/time/doc.go b/pkg/utils/time/doc.go new file mode 100644 index 0000000000..c34e1f0b89 --- /dev/null +++ b/pkg/utils/time/doc.go @@ -0,0 +1,16 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package time contains utility functions for time management. +package time diff --git a/pkg/utils/time/time.go b/pkg/utils/time/time.go new file mode 100644 index 0000000000..afe9710c27 --- /dev/null +++ b/pkg/utils/time/time.go @@ -0,0 +1,33 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package time + +import ( + "fmt" + "time" + + "github.com/liqotech/liqo/pkg/consts" +) + +// FormatLatency returns a string representing the given latency in a human readable format. +func FormatLatency(latency time.Duration) string { + if latency == 0 { + return consts.NotApplicable + } + if latency.Milliseconds() > 0 { + return fmt.Sprintf("%dms", latency.Milliseconds()) + } + return fmt.Sprintf("%dμs", latency.Microseconds()) +}