From 264c6279f05c9012e2e55e7cf34f59b5ecc703b3 Mon Sep 17 00:00:00 2001 From: Francesco Cheinasso Date: Tue, 3 Oct 2023 11:33:52 +0200 Subject: [PATCH] External Network: ping disable --- cmd/gateway/main.go | 60 ++++++++++--------- cmd/liqonet/gateway-operator.go | 20 ++++++- ...iqo-wireguard-gateway-client-template.yaml | 1 + ...iqo-wireguard-gateway-server-template.yaml | 4 +- .../tunnel-operator/tunnel-operator.go | 4 +- pkg/gateway/connection/conncheck/conncheck.go | 12 ++-- .../conncheck/{consts.go => options.go} | 21 ++++--- pkg/gateway/connection/conncheck/receiver.go | 10 ++-- pkg/gateway/connection/conncheck/sender.go | 4 +- .../connection/connections_controller.go | 49 ++++++++------- pkg/gateway/connection/flags.go | 43 +++++++------ pkg/gateway/connection/k8s.go | 5 +- pkg/gateway/connection/options.go | 45 ++++++++++++++ 13 files changed, 175 insertions(+), 103 deletions(-) rename pkg/gateway/connection/conncheck/{consts.go => options.go} (71%) create mode 100644 pkg/gateway/connection/options.go diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index ff7c7ba4c4..9567839065 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -33,6 +33,7 @@ import ( networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" "github.com/liqotech/liqo/pkg/gateway" "github.com/liqotech/liqo/pkg/gateway/connection" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" flagsutils "github.com/liqotech/liqo/pkg/utils/flags" "github.com/liqotech/liqo/pkg/utils/mapper" "github.com/liqotech/liqo/pkg/utils/restcfg" @@ -42,7 +43,10 @@ var ( addToSchemeFunctions = []func(*runtime.Scheme) error{ networkingv1alpha1.AddToScheme, } - options = gateway.NewOptions() + options = connection.NewOptions( + gateway.NewOptions(), + conncheck.NewOptions(), + ) ) // +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;create;update;delete @@ -59,17 +63,13 @@ func main() { klog.InitFlags(legacyflags) flagsutils.FromFlagToPflag(legacyflags, cmd.Flags()) - gateway.InitFlags(cmd.Flags(), options) + gateway.InitFlags(cmd.Flags(), options.GwOptions) if err := gateway.MarkFlagsRequired(&cmd); err != nil { klog.Error(err) os.Exit(1) } - connection.InitFlags(cmd.Flags()) - if err := connection.MarkFlagsRequired(&cmd); err != nil { - klog.Error(err) - os.Exit(1) - } + connection.InitFlags(cmd.Flags(), options) if err := cmd.Execute(); err != nil { klog.Error(err) @@ -101,44 +101,46 @@ func run(_ *cobra.Command, _ []string) error { Scheme: scheme, Cache: cache.Options{ DefaultNamespaces: map[string]cache.Config{ - options.Namespace: {}, + options.GwOptions.Namespace: {}, }, }, Metrics: server.Options{ BindAddress: "0", // Metrics are exposed by "connection" container. }, - HealthProbeBindAddress: options.ProbeAddr, - LeaderElection: options.LeaderElection, + HealthProbeBindAddress: options.GwOptions.ProbeAddr, + LeaderElection: options.GwOptions.LeaderElection, LeaderElectionID: fmt.Sprintf( "%s.%s.%s.connections.liqo.io", - options.Name, options.Namespace, options.Mode, + options.GwOptions.Name, options.GwOptions.Namespace, options.GwOptions.Mode, ), - LeaderElectionNamespace: options.Namespace, + LeaderElectionNamespace: options.GwOptions.Namespace, LeaderElectionReleaseOnCancel: true, LeaderElectionResourceLock: resourcelock.LeasesResourceLock, - LeaseDuration: &options.LeaderElectionLeaseDuration, - RenewDeadline: &options.LeaderElectionRenewDeadline, - RetryPeriod: &options.LeaderElectionRetryPeriod, + LeaseDuration: &options.GwOptions.LeaderElectionLeaseDuration, + RenewDeadline: &options.GwOptions.LeaderElectionRenewDeadline, + RetryPeriod: &options.GwOptions.LeaderElectionRetryPeriod, }) if err != nil { return fmt.Errorf("unable to create manager: %w", err) } - // Setup the controller. - connr, err := connection.NewConnectionsReconciler( - ctx, - mgr.GetClient(), - mgr.GetScheme(), - mgr.GetEventRecorderFor("connections-controller"), - options, - ) - if err != nil { - return fmt.Errorf("unable to create connectioons reconciler: %w", err) - } + if options.EnableConnectionController { + // Setup the controller. + connr, err := connection.NewConnectionsReconciler( + ctx, + mgr.GetClient(), + mgr.GetScheme(), + mgr.GetEventRecorderFor("connections-controller"), + options, + ) + if err != nil { + return fmt.Errorf("unable to create connectioons reconciler: %w", err) + } - // Setup the controller. - if err = connr.SetupWithManager(mgr); err != nil { - return fmt.Errorf("unable to setup connections reconciler: %w", err) + // Setup the controller. + if err = connr.SetupWithManager(mgr); err != nil { + return fmt.Errorf("unable to setup connections reconciler: %w", err) + } } // Start the manager. diff --git a/cmd/liqonet/gateway-operator.go b/cmd/liqonet/gateway-operator.go index 4c7ae14373..743e5bffa7 100644 --- a/cmd/liqonet/gateway-operator.go +++ b/cmd/liqonet/gateway-operator.go @@ -57,6 +57,10 @@ type gatewayOperatorFlags struct { tunnelListeningPort uint updateStatusInterval time.Duration securityMode *argsutils.StringEnum + PingPort int + PingBufferSize uint + PingLossThreshold uint + PingInterval time.Duration } func addGatewayOperatorFlags(liqonet *gatewayOperatorFlags) { @@ -77,9 +81,13 @@ func addGatewayOperatorFlags(liqonet *gatewayOperatorFlags) { "listening-port is the port used by the vpn tunnel") flag.DurationVar(&liqonet.updateStatusInterval, "gateway.ping-latency-update-interval", 30*time.Second, "ping-latency-update-interval is the interval at which the gateway operator updates the latency value in the status of the tunnel-endpoint") - flag.UintVar(&conncheck.PingLossThreshold, "gateway.ping-loss-threshold", 5, + flag.IntVar(&liqonet.PingPort, "gateway.ping-port", 12345, + "ping-port is the port used by the vpn tunnel") + flag.UintVar(&liqonet.PingBufferSize, "gateway.ping-buffer-size", 1024, + "ping-buffer-size is the size of the buffer used for the ping check") + flag.UintVar(&liqonet.PingLossThreshold, "gateway.ping-loss-threshold", 5, "ping-loss-threshold is the number of lost packets after which the connection check is considered as failed.") - flag.DurationVar(&conncheck.PingInterval, "gateway.ping-interval", 2*time.Second, + flag.DurationVar(&liqonet.PingInterval, "gateway.ping-interval", 2*time.Second, "ping-interval is the interval between two connection checks") flag.Var(liqonet.securityMode, "gateway.security-mode", "security-mode represents different security modes regarding connectivity among clusters") } @@ -203,7 +211,13 @@ func runGatewayOperator(commonFlags *liqonetCommonFlags, gatewayFlags *gatewayOp os.Exit(1) } tunnelController, err := tunneloperator.NewTunnelController(ctx, &wg, podIP.String(), podNamespace, eventRecorder, - clientset, main.GetClient(), &readyClustersMutex, readyClusters, gatewayNetns, hostNetns, int(MTU), int(port), updateStatusInterval, securityMode) + clientset, main.GetClient(), &readyClustersMutex, readyClusters, gatewayNetns, hostNetns, int(MTU), int(port), updateStatusInterval, securityMode, + &conncheck.Options{ + PingPort: gatewayFlags.PingPort, + PingBufferSize: gatewayFlags.PingBufferSize, + PingLossThreshold: gatewayFlags.PingLossThreshold, + PingInterval: gatewayFlags.PingInterval, + }) // If something goes wrong while creating and configuring the tunnel controller // then make sure that we remove all the resources created during the create process. if err != nil { diff --git a/deployments/liqo/templates/liqo-wireguard-gateway-client-template.yaml b/deployments/liqo/templates/liqo-wireguard-gateway-client-template.yaml index 4b4b6df4d2..70abb23b00 100644 --- a/deployments/liqo/templates/liqo-wireguard-gateway-client-template.yaml +++ b/deployments/liqo/templates/liqo-wireguard-gateway-client-template.yaml @@ -41,6 +41,7 @@ spec: - --mode=client - --metrics-address=:8080 - --health-probe-bind-address=:8081 + - --ping-enabled=true - --ping-loss-threshold={{ .Values.networking.gateway.ping.lossThreshold }} - --ping-interval={{ .Values.networking.gateway.ping.interval }} - --ping-update-status-interval={{ .Values.networking.gateway.ping.updateStatusInterval }} diff --git a/deployments/liqo/templates/liqo-wireguard-gateway-server-template.yaml b/deployments/liqo/templates/liqo-wireguard-gateway-server-template.yaml index 0f2d461930..f05ab8eab9 100644 --- a/deployments/liqo/templates/liqo-wireguard-gateway-server-template.yaml +++ b/deployments/liqo/templates/liqo-wireguard-gateway-server-template.yaml @@ -52,9 +52,7 @@ spec: - --mode=server - --metrics-address=:8080 - --health-probe-bind-address=:8081 - - --ping-loss-threshold=5 - - --ping-interval=2s - - --ping-update-status-interval=10s + - --ping-enabled=true - --ping-loss-threshold={{ .Values.networking.gateway.ping.lossThreshold }} - --ping-interval={{ .Values.networking.gateway.ping.interval }} - --ping-update-status-interval={{ .Values.networking.gateway.ping.updateStatusInterval }} diff --git a/internal/liqonet/tunnel-operator/tunnel-operator.go b/internal/liqonet/tunnel-operator/tunnel-operator.go index a523b9509a..1b488d2ddc 100644 --- a/internal/liqonet/tunnel-operator/tunnel-operator.go +++ b/internal/liqonet/tunnel-operator/tunnel-operator.go @@ -89,7 +89,7 @@ type TunnelController struct { func NewTunnelController(ctx context.Context, wg *sync.WaitGroup, podIP, namespace string, er record.EventRecorder, k8sClient k8s.Interface, cl client.Client, readyClustersMutex *sync.Mutex, readyClusters map[string]struct{}, gatewayNetns, hostNetns ns.NetNS, mtu, port int, - updateStatusInterval time.Duration, securityMode liqoconst.SecurityModeType) (*TunnelController, error) { + updateStatusInterval time.Duration, securityMode liqoconst.SecurityModeType, connCheckOpts *conncheck.Options) (*TunnelController, error) { tunnelEndpointFinalizer := liqoconst.LiqoGatewayOperatorName + "." + liqoconst.FinalizersSuffix tc := &TunnelController{ Client: cl, @@ -146,7 +146,7 @@ func NewTunnelController(ctx context.Context, wg *sync.WaitGroup, return fmt.Errorf("unable to enforce tunnel IP: %w", err) } - wg.Connchecker, err = conncheck.NewConnChecker() + wg.Connchecker, err = conncheck.NewConnChecker(connCheckOpts) if err != nil { return fmt.Errorf("failed to create connchecker: %w", err) } diff --git a/pkg/gateway/connection/conncheck/conncheck.go b/pkg/gateway/connection/conncheck/conncheck.go index d3c3e0855b..4a57c02e6a 100644 --- a/pkg/gateway/connection/conncheck/conncheck.go +++ b/pkg/gateway/connection/conncheck/conncheck.go @@ -27,6 +27,7 @@ import ( // ConnChecker is a struct that holds the receiver and senders. type ConnChecker struct { + opts *Options receiver *Receiver // key is the target cluster ID. senders map[string]*Sender @@ -36,9 +37,9 @@ type ConnChecker struct { } // NewConnChecker creates a new ConnChecker. -func NewConnChecker() (*ConnChecker, error) { +func NewConnChecker(opts *Options) (*ConnChecker, error) { addr := &net.UDPAddr{ - Port: port, + Port: opts.PingPort, IP: net.ParseIP("0.0.0.0"), } conn, err := net.ListenUDP("udp", addr) @@ -47,7 +48,8 @@ func NewConnChecker() (*ConnChecker, error) { } klog.V(4).Infof("conncheck socket: listening on %s", addr) connChecker := ConnChecker{ - receiver: NewReceiver(conn), + opts: opts, + receiver: NewReceiver(conn, opts), senders: make(map[string]*Sender), runningSenders: make(map[string]*Sender), conn: conn, @@ -81,7 +83,7 @@ func (c *ConnChecker) AddSender(ctx context.Context, clusterID, ip string, updat } ctxSender, cancelSender := context.WithCancel(ctx) - c.senders[clusterID], err = NewSender(ctxSender, clusterID, cancelSender, c.conn, ip) + c.senders[clusterID], err = NewSender(ctxSender, c.opts, clusterID, cancelSender, c.conn, ip) if err != nil { return fmt.Errorf("failed to create sender: %w", err) } @@ -105,7 +107,7 @@ func (c *ConnChecker) RunSender(clusterID string) { klog.Infof("conncheck sender %q starting against %q", clusterID, sender.raddr.IP.String()) - if err := wait.PollUntilContextCancel(sender.Ctx, PingInterval, false, func(ctx context.Context) (done bool, err error) { + if err := wait.PollUntilContextCancel(sender.Ctx, c.opts.PingInterval, false, func(ctx context.Context) (done bool, err error) { err = c.senders[clusterID].SendPing() if err != nil { klog.Warningf("failed to send ping: %s", err) diff --git a/pkg/gateway/connection/conncheck/consts.go b/pkg/gateway/connection/conncheck/options.go similarity index 71% rename from pkg/gateway/connection/conncheck/consts.go rename to pkg/gateway/connection/conncheck/options.go index a2a2a03a0c..d82a2649f6 100644 --- a/pkg/gateway/connection/conncheck/consts.go +++ b/pkg/gateway/connection/conncheck/options.go @@ -16,16 +16,19 @@ package conncheck import "time" -const ( - port = 12345 - buffSize = 1024 -) - -var ( +// Options contains the options for the wireguard interface. +type Options struct { + // PingPort is the port used for the ping check. + PingPort int + // PingBufferSize is the size of the buffer used for the ping check. + PingBufferSize uint // PingLossThreshold is the number of lost packets after which the connection check is considered as failed. PingLossThreshold uint // PingInterval is the interval at which the ping is sent. PingInterval time.Duration - // PingUpdateStatusInterval is the interval at which the status is updated. - PingUpdateStatusInterval time.Duration -) +} + +// NewOptions returns a new Options struct. +func NewOptions() *Options { + return &Options{} +} diff --git a/pkg/gateway/connection/conncheck/receiver.go b/pkg/gateway/connection/conncheck/receiver.go index 88c1a237a2..27143fe449 100644 --- a/pkg/gateway/connection/conncheck/receiver.go +++ b/pkg/gateway/connection/conncheck/receiver.go @@ -41,14 +41,16 @@ type Receiver struct { m sync.RWMutex buff []byte conn *net.UDPConn + opts *Options } // NewReceiver creates a new conncheck receiver. -func NewReceiver(conn *net.UDPConn) *Receiver { +func NewReceiver(conn *net.UDPConn, opts *Options) *Receiver { return &Receiver{ peers: make(map[string]*Peer), - buff: make([]byte, buffSize), + buff: make([]byte, opts.PingBufferSize), conn: conn, + opts: opts, } } @@ -141,12 +143,12 @@ func (r *Receiver) Run(ctx context.Context) { func (r *Receiver) RunDisconnectObserver(ctx context.Context) { klog.Infof("conncheck receiver disconnect checker: started") // Ignore errors because only caused by context cancellation. - err := wait.PollUntilContextCancel(ctx, time.Duration(PingLossThreshold)*PingInterval/10, true, + err := wait.PollUntilContextCancel(ctx, time.Duration(r.opts.PingLossThreshold)*r.opts.PingInterval/10, true, func(ctx context.Context) (done bool, err error) { r.m.Lock() defer r.m.Unlock() for id, peer := range r.peers { - if time.Since(peer.lastReceivedTimestamp.Add(peer.latency)) <= PingInterval*time.Duration(PingLossThreshold) { + if time.Since(peer.lastReceivedTimestamp.Add(peer.latency)) <= r.opts.PingInterval*time.Duration(r.opts.PingLossThreshold) { continue } klog.V(8).Infof("conncheck receiver: %s unreachable", id) diff --git a/pkg/gateway/connection/conncheck/sender.go b/pkg/gateway/connection/conncheck/sender.go index dd63ab92f0..2eb24a0a13 100644 --- a/pkg/gateway/connection/conncheck/sender.go +++ b/pkg/gateway/connection/conncheck/sender.go @@ -34,7 +34,7 @@ type Sender struct { } // NewSender creates a new conncheck sender. -func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) { +func NewSender(ctx context.Context, opts *Options, clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) { pip := net.ParseIP(ip) if pip == nil { return nil, fmt.Errorf("conncheck sender: invalid IP address %s", ip) @@ -44,7 +44,7 @@ func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.U clusterID: clusterID, cancel: cancel, conn: conn, - raddr: net.UDPAddr{IP: pip, Port: port}, + raddr: net.UDPAddr{IP: pip, Port: opts.PingPort}, }, nil } diff --git a/pkg/gateway/connection/connections_controller.go b/pkg/gateway/connection/connections_controller.go index 50b3b0a8c8..51a972069c 100644 --- a/pkg/gateway/connection/connections_controller.go +++ b/pkg/gateway/connection/connections_controller.go @@ -30,7 +30,6 @@ import ( networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" "github.com/liqotech/liqo/pkg/consts" - "github.com/liqotech/liqo/pkg/gateway" "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" "github.com/liqotech/liqo/pkg/gateway/tunnel/common" ) @@ -45,13 +44,13 @@ type ConnectionsReconciler struct { Client client.Client Scheme *runtime.Scheme EventsRecorder record.EventRecorder - Options *gateway.Options + Options *Options } // NewConnectionsReconciler returns a new PublicKeysReconciler. func NewConnectionsReconciler(ctx context.Context, cl client.Client, - s *runtime.Scheme, er record.EventRecorder, options *gateway.Options) (*ConnectionsReconciler, error) { - connchecker, err := conncheck.NewConnChecker() + s *runtime.Scheme, er record.EventRecorder, options *Options) (*ConnectionsReconciler, error) { + connchecker, err := conncheck.NewConnChecker(options.ConnCheckOptions) if err != nil { return nil, fmt.Errorf("unable to create the connection checker: %w", err) } @@ -76,23 +75,33 @@ func (r *ConnectionsReconciler) Reconcile(ctx context.Context, req ctrl.Request) } return ctrl.Result{}, fmt.Errorf("unable to get the connection %q: %w", req.NamespacedName, err) } + klog.Infof("Reconciling connection %q", req.NamespacedName) - remoteIP, err := common.GetRemoteInterfaceIP(r.Options.Mode) - if err != nil { - return ctrl.Result{}, fmt.Errorf("unable to get the remote interface IP: %w", err) - } + updateConnection := ForgeUpdateConnectionCallback(ctx, r.Client, r.Options, req) - err = r.ConnChecker.AddSender(ctx, r.Options.RemoteClusterID, remoteIP, ForgeUpdateConnectionCallback(ctx, r.Client, req)) - if err != nil { - switch err.(type) { - case *conncheck.DuplicateError: - return ctrl.Result{}, nil - default: - return ctrl.Result{}, fmt.Errorf("unable to add the sender: %w", err) + switch r.Options.PingEnabled { + case true: + remoteIP, err := common.GetRemoteInterfaceIP(r.Options.GwOptions.Mode) + if err != nil { + return ctrl.Result{}, fmt.Errorf("unable to get the remote interface IP: %w", err) } - } - go r.ConnChecker.RunSender(r.Options.RemoteClusterID) + err = r.ConnChecker.AddSender(ctx, r.Options.GwOptions.RemoteClusterID, remoteIP, updateConnection) + if err != nil { + switch err.(type) { + case *conncheck.DuplicateError: + return ctrl.Result{}, nil + default: + return ctrl.Result{}, fmt.Errorf("unable to add the sender: %w", err) + } + } + + go r.ConnChecker.RunSender(r.Options.GwOptions.RemoteClusterID) + case false: + if err := updateConnection(true, 0, time.Time{}); err != nil { + return ctrl.Result{}, fmt.Errorf("unable to update the connection status: %w", err) + } + } return ctrl.Result{}, nil } @@ -115,12 +124,12 @@ func (r *ConnectionsReconciler) Predicates() builder.Predicates { if connection.Labels == nil { return false } - return connection.Labels[string(consts.RemoteClusterID)] == r.Options.RemoteClusterID + return connection.Labels[string(consts.RemoteClusterID)] == r.Options.GwOptions.RemoteClusterID })) } // ForgeUpdateConnectionCallback forges the UpdateConnectionStatus function. -func ForgeUpdateConnectionCallback(ctx context.Context, cl client.Client, req ctrl.Request) conncheck.UpdateFunc { +func ForgeUpdateConnectionCallback(ctx context.Context, cl client.Client, opts *Options, req ctrl.Request) conncheck.UpdateFunc { return func(connected bool, latency time.Duration, timestamp time.Time) error { connection := &networkingv1alpha1.Connection{} if err := cl.Get(ctx, req.NamespacedName, connection); err != nil { @@ -133,6 +142,6 @@ func ForgeUpdateConnectionCallback(ctx context.Context, cl client.Client, req ct case false: connStatusValue = networkingv1alpha1.ConnectionError } - return UpdateConnectionStatus(ctx, cl, connection, connStatusValue, latency, timestamp) + return UpdateConnectionStatus(ctx, cl, opts, connection, connStatusValue, latency, timestamp) } } diff --git a/pkg/gateway/connection/flags.go b/pkg/gateway/connection/flags.go index 29ee8e67d7..24629399b3 100644 --- a/pkg/gateway/connection/flags.go +++ b/pkg/gateway/connection/flags.go @@ -17,10 +17,7 @@ package connection import ( "time" - "github.com/spf13/cobra" "github.com/spf13/pflag" - - "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" ) // FlagName is the type for the name of the flags. @@ -31,6 +28,14 @@ func (fn FlagName) String() string { } const ( + // EnableConnectionControllerFlag is the name of the flag used to enable the connection controller. + EnableConnectionControllerFlag FlagName = "enable-connection-controller" + // PingEnabledFlag is the name of the flag used to enable the ping check. + PingEnabledFlag FlagName = "ping-enabled" + // PingPortFlag is the name of the flag used to set the ping port. + PingPortFlag FlagName = "ping-port" + // PingBufferSizeFlag is the name of the flag used to set the ping buffer size. + PingBufferSizeFlag FlagName = "ping-buffer-size" // PingLossThresholdFlag is the name of the flag used to set the ping loss threshold. PingLossThresholdFlag FlagName = "ping-loss-threshold" // PingIntervalFlag is the name of the flag used to set the ping interval. @@ -39,28 +44,20 @@ const ( PingUpdateStatusIntervalFlag FlagName = "ping-update-status-interval" ) -// RequiredFlags contains the list of the mandatory flags. -var RequiredFlags = []FlagName{ - PingLossThresholdFlag, - PingIntervalFlag, -} - // InitFlags initializes the flags for the wireguard tunnel. -func InitFlags(flagset *pflag.FlagSet) { - flagset.UintVar(&conncheck.PingLossThreshold, PingLossThresholdFlag.String(), 5, +func InitFlags(flagset *pflag.FlagSet, options *Options) { + flagset.BoolVar(&options.EnableConnectionController, EnableConnectionControllerFlag.String(), true, + "enable-connection-controller enables the connection controller. It is useful if the tunnel technology implements a connection check.") + flagset.BoolVar(&options.PingEnabled, PingEnabledFlag.String(), true, + "ping-enabled enables the ping check. If disabled the connection resource will be always connected and the latency won't be available.") + flagset.IntVar(&options.ConnCheckOptions.PingPort, PingPortFlag.String(), 12345, + "ping-port is the port used for the ping check") + flagset.UintVar(&options.ConnCheckOptions.PingBufferSize, PingBufferSizeFlag.String(), 1024, + "ping-buffer-size is the size of the buffer used for the ping check") + flagset.UintVar(&options.ConnCheckOptions.PingLossThreshold, PingLossThresholdFlag.String(), 5, "ping-loss-threshold is the number of lost packets after which the connection check is considered as failed.") - flagset.DurationVar(&conncheck.PingInterval, PingIntervalFlag.String(), 2*time.Second, + flagset.DurationVar(&options.ConnCheckOptions.PingInterval, PingIntervalFlag.String(), 2*time.Second, "ping-interval is the interval between two connection checks") - flagset.DurationVar(&conncheck.PingUpdateStatusInterval, PingUpdateStatusIntervalFlag.String(), 10*time.Second, + flagset.DurationVar(&options.PingUpdateStatusInterval, PingUpdateStatusIntervalFlag.String(), 10*time.Second, "ping-update-status-interval is the interval at which the status is updated") } - -// MarkFlagsRequired marks the flags as required. -func MarkFlagsRequired(cmd *cobra.Command) error { - for _, flag := range RequiredFlags { - if err := cmd.MarkFlagRequired(flag.String()); err != nil { - return err - } - } - return nil -} diff --git a/pkg/gateway/connection/k8s.go b/pkg/gateway/connection/k8s.go index ece23ab6fe..f3c0a26a19 100644 --- a/pkg/gateway/connection/k8s.go +++ b/pkg/gateway/connection/k8s.go @@ -24,15 +24,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1" - "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" timeutils "github.com/liqotech/liqo/pkg/utils/time" ) // UpdateConnectionStatus updates the status of a connection. -func UpdateConnectionStatus(ctx context.Context, cl client.Client, connection *networkingv1alpha1.Connection, +func UpdateConnectionStatus(ctx context.Context, cl client.Client, opts *Options, connection *networkingv1alpha1.Connection, value networkingv1alpha1.ConnectionStatusValue, latency time.Duration, timestamp time.Time) error { if connection.Status.Value != value || - timestamp.Sub(connection.Status.Latency.Timestamp.Time) > conncheck.PingUpdateStatusInterval { + timestamp.Sub(connection.Status.Latency.Timestamp.Time) > opts.PingUpdateStatusInterval { if connection.Status.Value != value { klog.Infof("changing connection %q status to %q", client.ObjectKeyFromObject(connection).String(), value) diff --git a/pkg/gateway/connection/options.go b/pkg/gateway/connection/options.go new file mode 100644 index 0000000000..18266db7d2 --- /dev/null +++ b/pkg/gateway/connection/options.go @@ -0,0 +1,45 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "time" + + "github.com/liqotech/liqo/pkg/gateway" + "github.com/liqotech/liqo/pkg/gateway/connection/conncheck" +) + +// Options contains the options for the wireguard interface. +type Options struct { + // EnableConnectionController enables the connection controller. + EnableConnectionController bool + // GwOptions contains the options for the wireguard interface. + GwOptions *gateway.Options + // ConnCheckOptions contains the options for the connchecker. + ConnCheckOptions *conncheck.Options + // PingEnabled enables the ping check. + PingEnabled bool + // PingUpdateStatusInterval is the interval at which the status is updated. + PingUpdateStatusInterval time.Duration +} + +// NewOptions returns a new Options struct. +func NewOptions(gwOptions *gateway.Options, + conncheckOptions *conncheck.Options) *Options { + return &Options{ + GwOptions: gwOptions, + ConnCheckOptions: conncheckOptions, + } +}