Skip to content

Commit

Permalink
Liqonet: connchecker refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 committed Oct 24, 2023
1 parent aad6dfd commit 5938dee
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 32 deletions.
76 changes: 50 additions & 26 deletions pkg/gateway/connection/conncheck/conncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (
type ConnChecker struct {
receiver *Receiver
// key is the target cluster ID.
senders map[string]*Sender
sm sync.RWMutex
conn *net.UDPConn
senders map[string]*Sender
runningSenders map[string]*Sender
sm sync.RWMutex
conn *net.UDPConn
}

// NewConnChecker creates a new ConnChecker.
Expand All @@ -46,9 +47,10 @@ func NewConnChecker() (*ConnChecker, error) {
}
klog.V(4).Infof("conncheck socket: listening on %s", addr)
connChecker := ConnChecker{
receiver: NewReceiver(conn),
senders: make(map[string]*Sender),
conn: conn,
receiver: NewReceiver(conn),
senders: make(map[string]*Sender),
runningSenders: make(map[string]*Sender),
conn: conn,
}
return &connChecker, nil
}
Expand All @@ -63,48 +65,53 @@ func (c *ConnChecker) RunReceiverDisconnectObserver(ctx context.Context) {
c.receiver.RunDisconnectObserver(ctx)
}

// AddAndRunSender create a new sender and runs it.
func (c *ConnChecker) AddAndRunSender(ctx context.Context, clusterID, ip string, updateCallback UpdateFunc) {
// AddSender adds a sender.
func (c *ConnChecker) AddSender(ctx context.Context, clusterID, ip string, updateCallback UpdateFunc) error {
var err error

if clusterID == "" {
klog.Errorf("clusterID is empty")
return
return fmt.Errorf("clusterID cannot be empty")
}

c.sm.Lock()
defer c.sm.Unlock()

if _, ok := c.senders[clusterID]; ok {
c.sm.Unlock()
klog.Warningf("sender %s already exists", clusterID)
return
return NewDuplicateError(clusterID)
}

ctxSender, cancelSender := context.WithCancel(ctx)
c.senders[clusterID], err = NewSender(clusterID, cancelSender, c.conn, ip)

c.senders[clusterID], err = NewSender(ctxSender, clusterID, cancelSender, c.conn, ip)
if err != nil {
c.sm.Unlock()
klog.Errorf("failed to create sender: %w", err)
return
return fmt.Errorf("failed to create sender: %w", err)
}

err = c.receiver.InitPeer(clusterID, updateCallback)
if err != nil {
c.sm.Unlock()
klog.Errorf("failed to add redirect chan: %w", err)
return fmt.Errorf("failed to init peer: %w", err)
}

klog.Infof("conncheck sender %q added", clusterID, ip)
return nil
}

// RunSender runs a sender.
func (c *ConnChecker) RunSender(clusterID string) {
sender, err := c.setRunning(clusterID)
if err != nil {
klog.Errorf("conncheck sender %s doesn't start for an error: %s", clusterID, err)
return
}

klog.Infof("conncheck sender %q starting against %q", clusterID, ip)
pingCallback := func(ctx context.Context) (done bool, err error) {
klog.Infof("conncheck sender %q starting against %q", clusterID, sender.raddr.IP.String())

if err := wait.PollUntilContextCancel(sender.Ctx, PingInterval, false, func(ctx context.Context) (done bool, err error) {
err = c.senders[clusterID].SendPing()
if err != nil {
klog.Warningf("failed to send ping: %s", err)
}
return false, nil
}
c.sm.Unlock()

if err := wait.PollUntilContextCancel(ctxSender, PingInterval, false, pingCallback); err != nil {
}); err != nil {
klog.Errorf("conncheck sender %s stopped for an error: %s", clusterID, err)
}

Expand All @@ -123,6 +130,8 @@ func (c *ConnChecker) DelAndStopSender(clusterID string) {
c.senders[clusterID].cancel()
delete(c.senders, clusterID)
}

delete(c.runningSenders, clusterID)
delete(c.receiver.peers, clusterID)
}

Expand All @@ -145,3 +154,18 @@ func (c *ConnChecker) GetConnected(clusterID string) (bool, error) {
}
return false, fmt.Errorf("sender %s not found", clusterID)
}

func (c *ConnChecker) setRunning(clusterID string) (*Sender, error) {
c.sm.Lock()
defer c.sm.Unlock()
sender, ok := c.senders[clusterID]
if !ok {
return nil, fmt.Errorf("sender %s not found", clusterID)
}

if _, ok := c.runningSenders[clusterID]; ok {
return nil, fmt.Errorf("sender %s already running", clusterID)
}
c.runningSenders[clusterID] = sender
return sender, nil
}
33 changes: 33 additions & 0 deletions pkg/gateway/connection/conncheck/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2019-2023 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package conncheck

import "fmt"

// DuplicateError is an error type for ConnChecker.
// It is returned when an already present sender is added.
type DuplicateError struct {
Err error
}

// NewDuplicateError returns a new DuplicateError.
func NewDuplicateError(clusterID string) *DuplicateError {
return &DuplicateError{Err: fmt.Errorf("sender %s already added", clusterID)}
}

// Error returns the error message.
func (e DuplicateError) Error() string {
return e.Err.Error()
}
5 changes: 4 additions & 1 deletion pkg/gateway/connection/conncheck/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package conncheck

import (
"context"
"encoding/json"
"fmt"
"net"
Expand All @@ -25,19 +26,21 @@ import (

// Sender is a sender for the conncheck server.
type Sender struct {
Ctx context.Context
clusterID string
cancel func()
conn *net.UDPConn
raddr net.UDPAddr
}

// NewSender creates a new conncheck sender.
func NewSender(clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) {
func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) {
pip := net.ParseIP(ip)
if pip == nil {
return nil, fmt.Errorf("conncheck sender: invalid IP address %s", ip)
}
return &Sender{
Ctx: ctx,
clusterID: clusterID,
cancel: cancel,
conn: conn,
Expand Down
15 changes: 11 additions & 4 deletions pkg/gateway/connection/connections_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,17 @@ func (r *ConnectionsReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, fmt.Errorf("unable to get the remote interface IP: %w", err)
}

go r.ConnChecker.AddAndRunSender(
ctx, r.Options.RemoteClusterID, remoteIP,
ForgeUpdateConnectionCallback(ctx, r.Client, req),
)
err = r.ConnChecker.AddSender(ctx, r.Options.RemoteClusterID, remoteIP, ForgeUpdateConnectionCallback(ctx, r.Client, req))
if err != nil {
switch err.(type) {
case *conncheck.DuplicateError:
return ctrl.Result{}, nil
default:
return ctrl.Result{}, fmt.Errorf("unable to add the sender: %w", err)
}
}

go r.ConnChecker.RunSender(r.Options.RemoteClusterID)

return ctrl.Result{}, nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/liqonet/tunnel/wireguard/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,11 @@ func (w *Wireguard) ConnectToEndpoint(ctx context.Context, tep *netv1alpha1.Tunn

klog.Infof("%s -> starting conncheck sender", tep.Spec.ClusterIdentity)

go w.Connchecker.AddAndRunSender(ctx, tep.Spec.ClusterIdentity.ClusterID, pingIP, updateStatus)
if err := w.Connchecker.AddSender(ctx, tep.Spec.ClusterIdentity.ClusterID, pingIP, updateStatus); err != nil {
return nil, fmt.Errorf("failed to add conncheck sender for cluster %s: %w", tep.Spec.ClusterIdentity, err)
}

go w.Connchecker.RunSender(tep.Spec.ClusterIdentity.ClusterID)

klog.V(4).Infof("Done connecting cluster peer %s@%s", tep.Spec.ClusterIdentity, endpoint.String())
return c, nil
Expand Down

0 comments on commit 5938dee

Please sign in to comment.