Skip to content

Commit

Permalink
External Network: ping disable
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 committed Nov 13, 2023
1 parent cbc3710 commit 264c627
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 103 deletions.
60 changes: 31 additions & 29 deletions cmd/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1"
"github.com/liqotech/liqo/pkg/gateway"
"github.com/liqotech/liqo/pkg/gateway/connection"
"github.com/liqotech/liqo/pkg/gateway/connection/conncheck"
flagsutils "github.com/liqotech/liqo/pkg/utils/flags"
"github.com/liqotech/liqo/pkg/utils/mapper"
"github.com/liqotech/liqo/pkg/utils/restcfg"
Expand All @@ -42,7 +43,10 @@ var (
addToSchemeFunctions = []func(*runtime.Scheme) error{
networkingv1alpha1.AddToScheme,
}
options = gateway.NewOptions()
options = connection.NewOptions(
gateway.NewOptions(),
conncheck.NewOptions(),
)
)

// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;create;update;delete
Expand All @@ -59,17 +63,13 @@ func main() {
klog.InitFlags(legacyflags)
flagsutils.FromFlagToPflag(legacyflags, cmd.Flags())

gateway.InitFlags(cmd.Flags(), options)
gateway.InitFlags(cmd.Flags(), options.GwOptions)
if err := gateway.MarkFlagsRequired(&cmd); err != nil {
klog.Error(err)
os.Exit(1)
}

connection.InitFlags(cmd.Flags())
if err := connection.MarkFlagsRequired(&cmd); err != nil {
klog.Error(err)
os.Exit(1)
}
connection.InitFlags(cmd.Flags(), options)

if err := cmd.Execute(); err != nil {
klog.Error(err)
Expand Down Expand Up @@ -101,44 +101,46 @@ func run(_ *cobra.Command, _ []string) error {
Scheme: scheme,
Cache: cache.Options{
DefaultNamespaces: map[string]cache.Config{
options.Namespace: {},
options.GwOptions.Namespace: {},
},
},
Metrics: server.Options{
BindAddress: "0", // Metrics are exposed by "connection" container.
},
HealthProbeBindAddress: options.ProbeAddr,
LeaderElection: options.LeaderElection,
HealthProbeBindAddress: options.GwOptions.ProbeAddr,
LeaderElection: options.GwOptions.LeaderElection,
LeaderElectionID: fmt.Sprintf(
"%s.%s.%s.connections.liqo.io",
options.Name, options.Namespace, options.Mode,
options.GwOptions.Name, options.GwOptions.Namespace, options.GwOptions.Mode,
),
LeaderElectionNamespace: options.Namespace,
LeaderElectionNamespace: options.GwOptions.Namespace,
LeaderElectionReleaseOnCancel: true,
LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
LeaseDuration: &options.LeaderElectionLeaseDuration,
RenewDeadline: &options.LeaderElectionRenewDeadline,
RetryPeriod: &options.LeaderElectionRetryPeriod,
LeaseDuration: &options.GwOptions.LeaderElectionLeaseDuration,
RenewDeadline: &options.GwOptions.LeaderElectionRenewDeadline,
RetryPeriod: &options.GwOptions.LeaderElectionRetryPeriod,
})
if err != nil {
return fmt.Errorf("unable to create manager: %w", err)
}

// Setup the controller.
connr, err := connection.NewConnectionsReconciler(
ctx,
mgr.GetClient(),
mgr.GetScheme(),
mgr.GetEventRecorderFor("connections-controller"),
options,
)
if err != nil {
return fmt.Errorf("unable to create connectioons reconciler: %w", err)
}
if options.EnableConnectionController {
// Setup the controller.
connr, err := connection.NewConnectionsReconciler(
ctx,
mgr.GetClient(),
mgr.GetScheme(),
mgr.GetEventRecorderFor("connections-controller"),
options,
)
if err != nil {
return fmt.Errorf("unable to create connectioons reconciler: %w", err)
}

// Setup the controller.
if err = connr.SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to setup connections reconciler: %w", err)
// Setup the controller.
if err = connr.SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to setup connections reconciler: %w", err)
}
}

// Start the manager.
Expand Down
20 changes: 17 additions & 3 deletions cmd/liqonet/gateway-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type gatewayOperatorFlags struct {
tunnelListeningPort uint
updateStatusInterval time.Duration
securityMode *argsutils.StringEnum
PingPort int
PingBufferSize uint
PingLossThreshold uint
PingInterval time.Duration
}

func addGatewayOperatorFlags(liqonet *gatewayOperatorFlags) {
Expand All @@ -77,9 +81,13 @@ func addGatewayOperatorFlags(liqonet *gatewayOperatorFlags) {
"listening-port is the port used by the vpn tunnel")
flag.DurationVar(&liqonet.updateStatusInterval, "gateway.ping-latency-update-interval", 30*time.Second,
"ping-latency-update-interval is the interval at which the gateway operator updates the latency value in the status of the tunnel-endpoint")
flag.UintVar(&conncheck.PingLossThreshold, "gateway.ping-loss-threshold", 5,
flag.IntVar(&liqonet.PingPort, "gateway.ping-port", 12345,
"ping-port is the port used by the vpn tunnel")
flag.UintVar(&liqonet.PingBufferSize, "gateway.ping-buffer-size", 1024,
"ping-buffer-size is the size of the buffer used for the ping check")
flag.UintVar(&liqonet.PingLossThreshold, "gateway.ping-loss-threshold", 5,
"ping-loss-threshold is the number of lost packets after which the connection check is considered as failed.")
flag.DurationVar(&conncheck.PingInterval, "gateway.ping-interval", 2*time.Second,
flag.DurationVar(&liqonet.PingInterval, "gateway.ping-interval", 2*time.Second,
"ping-interval is the interval between two connection checks")
flag.Var(liqonet.securityMode, "gateway.security-mode", "security-mode represents different security modes regarding connectivity among clusters")
}
Expand Down Expand Up @@ -203,7 +211,13 @@ func runGatewayOperator(commonFlags *liqonetCommonFlags, gatewayFlags *gatewayOp
os.Exit(1)
}
tunnelController, err := tunneloperator.NewTunnelController(ctx, &wg, podIP.String(), podNamespace, eventRecorder,
clientset, main.GetClient(), &readyClustersMutex, readyClusters, gatewayNetns, hostNetns, int(MTU), int(port), updateStatusInterval, securityMode)
clientset, main.GetClient(), &readyClustersMutex, readyClusters, gatewayNetns, hostNetns, int(MTU), int(port), updateStatusInterval, securityMode,
&conncheck.Options{
PingPort: gatewayFlags.PingPort,
PingBufferSize: gatewayFlags.PingBufferSize,
PingLossThreshold: gatewayFlags.PingLossThreshold,
PingInterval: gatewayFlags.PingInterval,
})
// If something goes wrong while creating and configuring the tunnel controller
// then make sure that we remove all the resources created during the create process.
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ spec:
- --mode=client
- --metrics-address=:8080
- --health-probe-bind-address=:8081
- --ping-enabled=true
- --ping-loss-threshold={{ .Values.networking.gateway.ping.lossThreshold }}
- --ping-interval={{ .Values.networking.gateway.ping.interval }}
- --ping-update-status-interval={{ .Values.networking.gateway.ping.updateStatusInterval }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ spec:
- --mode=server
- --metrics-address=:8080
- --health-probe-bind-address=:8081
- --ping-loss-threshold=5
- --ping-interval=2s
- --ping-update-status-interval=10s
- --ping-enabled=true
- --ping-loss-threshold={{ .Values.networking.gateway.ping.lossThreshold }}
- --ping-interval={{ .Values.networking.gateway.ping.interval }}
- --ping-update-status-interval={{ .Values.networking.gateway.ping.updateStatusInterval }}
Expand Down
4 changes: 2 additions & 2 deletions internal/liqonet/tunnel-operator/tunnel-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type TunnelController struct {
func NewTunnelController(ctx context.Context, wg *sync.WaitGroup,
podIP, namespace string, er record.EventRecorder, k8sClient k8s.Interface, cl client.Client,
readyClustersMutex *sync.Mutex, readyClusters map[string]struct{}, gatewayNetns, hostNetns ns.NetNS, mtu, port int,
updateStatusInterval time.Duration, securityMode liqoconst.SecurityModeType) (*TunnelController, error) {
updateStatusInterval time.Duration, securityMode liqoconst.SecurityModeType, connCheckOpts *conncheck.Options) (*TunnelController, error) {
tunnelEndpointFinalizer := liqoconst.LiqoGatewayOperatorName + "." + liqoconst.FinalizersSuffix
tc := &TunnelController{
Client: cl,
Expand Down Expand Up @@ -146,7 +146,7 @@ func NewTunnelController(ctx context.Context, wg *sync.WaitGroup,
return fmt.Errorf("unable to enforce tunnel IP: %w", err)
}

wg.Connchecker, err = conncheck.NewConnChecker()
wg.Connchecker, err = conncheck.NewConnChecker(connCheckOpts)
if err != nil {
return fmt.Errorf("failed to create connchecker: %w", err)
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/gateway/connection/conncheck/conncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

// ConnChecker is a struct that holds the receiver and senders.
type ConnChecker struct {
opts *Options
receiver *Receiver
// key is the target cluster ID.
senders map[string]*Sender
Expand All @@ -36,9 +37,9 @@ type ConnChecker struct {
}

// NewConnChecker creates a new ConnChecker.
func NewConnChecker() (*ConnChecker, error) {
func NewConnChecker(opts *Options) (*ConnChecker, error) {
addr := &net.UDPAddr{
Port: port,
Port: opts.PingPort,
IP: net.ParseIP("0.0.0.0"),
}
conn, err := net.ListenUDP("udp", addr)
Expand All @@ -47,7 +48,8 @@ func NewConnChecker() (*ConnChecker, error) {
}
klog.V(4).Infof("conncheck socket: listening on %s", addr)
connChecker := ConnChecker{
receiver: NewReceiver(conn),
opts: opts,
receiver: NewReceiver(conn, opts),
senders: make(map[string]*Sender),
runningSenders: make(map[string]*Sender),
conn: conn,
Expand Down Expand Up @@ -81,7 +83,7 @@ func (c *ConnChecker) AddSender(ctx context.Context, clusterID, ip string, updat
}

ctxSender, cancelSender := context.WithCancel(ctx)
c.senders[clusterID], err = NewSender(ctxSender, clusterID, cancelSender, c.conn, ip)
c.senders[clusterID], err = NewSender(ctxSender, c.opts, clusterID, cancelSender, c.conn, ip)
if err != nil {
return fmt.Errorf("failed to create sender: %w", err)
}
Expand All @@ -105,7 +107,7 @@ func (c *ConnChecker) RunSender(clusterID string) {

klog.Infof("conncheck sender %q starting against %q", clusterID, sender.raddr.IP.String())

if err := wait.PollUntilContextCancel(sender.Ctx, PingInterval, false, func(ctx context.Context) (done bool, err error) {
if err := wait.PollUntilContextCancel(sender.Ctx, c.opts.PingInterval, false, func(ctx context.Context) (done bool, err error) {
err = c.senders[clusterID].SendPing()
if err != nil {
klog.Warningf("failed to send ping: %s", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ package conncheck

import "time"

const (
port = 12345
buffSize = 1024
)

var (
// Options contains the options for the wireguard interface.
type Options struct {
// PingPort is the port used for the ping check.
PingPort int
// PingBufferSize is the size of the buffer used for the ping check.
PingBufferSize uint
// PingLossThreshold is the number of lost packets after which the connection check is considered as failed.
PingLossThreshold uint
// PingInterval is the interval at which the ping is sent.
PingInterval time.Duration
// PingUpdateStatusInterval is the interval at which the status is updated.
PingUpdateStatusInterval time.Duration
)
}

// NewOptions returns a new Options struct.
func NewOptions() *Options {
return &Options{}
}
10 changes: 6 additions & 4 deletions pkg/gateway/connection/conncheck/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ type Receiver struct {
m sync.RWMutex
buff []byte
conn *net.UDPConn
opts *Options
}

// NewReceiver creates a new conncheck receiver.
func NewReceiver(conn *net.UDPConn) *Receiver {
func NewReceiver(conn *net.UDPConn, opts *Options) *Receiver {
return &Receiver{
peers: make(map[string]*Peer),
buff: make([]byte, buffSize),
buff: make([]byte, opts.PingBufferSize),
conn: conn,
opts: opts,
}
}

Expand Down Expand Up @@ -141,12 +143,12 @@ func (r *Receiver) Run(ctx context.Context) {
func (r *Receiver) RunDisconnectObserver(ctx context.Context) {
klog.Infof("conncheck receiver disconnect checker: started")
// Ignore errors because only caused by context cancellation.
err := wait.PollUntilContextCancel(ctx, time.Duration(PingLossThreshold)*PingInterval/10, true,
err := wait.PollUntilContextCancel(ctx, time.Duration(r.opts.PingLossThreshold)*r.opts.PingInterval/10, true,
func(ctx context.Context) (done bool, err error) {
r.m.Lock()
defer r.m.Unlock()
for id, peer := range r.peers {
if time.Since(peer.lastReceivedTimestamp.Add(peer.latency)) <= PingInterval*time.Duration(PingLossThreshold) {
if time.Since(peer.lastReceivedTimestamp.Add(peer.latency)) <= r.opts.PingInterval*time.Duration(r.opts.PingLossThreshold) {
continue
}
klog.V(8).Infof("conncheck receiver: %s unreachable", id)
Expand Down
4 changes: 2 additions & 2 deletions pkg/gateway/connection/conncheck/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Sender struct {
}

// NewSender creates a new conncheck sender.
func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) {
func NewSender(ctx context.Context, opts *Options, clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) {
pip := net.ParseIP(ip)
if pip == nil {
return nil, fmt.Errorf("conncheck sender: invalid IP address %s", ip)
Expand All @@ -44,7 +44,7 @@ func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.U
clusterID: clusterID,
cancel: cancel,
conn: conn,
raddr: net.UDPAddr{IP: pip, Port: port},
raddr: net.UDPAddr{IP: pip, Port: opts.PingPort},
}, nil
}

Expand Down
Loading

0 comments on commit 264c627

Please sign in to comment.