Skip to content

Commit

Permalink
Use Pod IP for peer communication
Browse files Browse the repository at this point in the history
Signed-off-by: Carlo Lobrano <[email protected]>
  • Loading branch information
clobrano committed Jun 13, 2024
1 parent c4a785c commit a71be9c
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 21 deletions.
10 changes: 5 additions & 5 deletions pkg/apicheck/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/go-logr/logr"
"google.golang.org/grpc/credentials"

v1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -214,7 +213,7 @@ func (c *ApiConnectivityCheck) canOtherControlPlanesBeReached() bool {
return (healthyResponses + unhealthyResponses + apiErrorsResponses) > 0
}

func (c *ApiConnectivityCheck) popNodes(nodes *[][]v1.NodeAddress, count int) []string {
func (c *ApiConnectivityCheck) popNodes(nodes *[]string, count int) []string {
nrOfNodes := len(*nodes)
if nrOfNodes == 0 {
return []string{}
Expand All @@ -227,12 +226,13 @@ func (c *ApiConnectivityCheck) popNodes(nodes *[][]v1.NodeAddress, count int) []
//todo maybe we should pick nodes randomly rather than relying on the order returned from api-server
addresses := make([]string, count)
for i := 0; i < count; i++ {
nodeAddresses := (*nodes)[i]
if len(nodeAddresses) == 0 || nodeAddresses[0].Address == "" {
// TODO: shall we need to get "count" addresses anyway, replacing empty IP with another Node?
address := (*nodes)[i]
if address == "" {
c.config.Log.Info("ignoring node without IP address")
continue
}
addresses[i] = nodeAddresses[0].Address //todo node might have multiple addresses or none
addresses[i] = address
}

*nodes = (*nodes)[count:] //remove popped nodes from the list
Expand Down
44 changes: 28 additions & 16 deletions pkg/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Peers struct {
myNodeName string
mutex sync.Mutex
apiServerTimeout time.Duration
workerPeersAddresses, controlPlanePeersAddresses [][]v1.NodeAddress
workerPeersAddresses, controlPlanePeersAddresses []string
}

func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Reader, log logr.Logger, apiServerTimeout time.Duration) *Peers {
Expand All @@ -47,8 +47,8 @@ func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Read
myNodeName: myNodeName,
mutex: sync.Mutex{},
apiServerTimeout: apiServerTimeout,
workerPeersAddresses: [][]v1.NodeAddress{},
controlPlanePeersAddresses: [][]v1.NodeAddress{},
workerPeersAddresses: []string{},
controlPlanePeersAddresses: []string{},
}
}

Expand Down Expand Up @@ -88,18 +88,18 @@ func (p *Peers) Start(ctx context.Context) error {
}

func (p *Peers) updateWorkerPeers(ctx context.Context) {
setterFunc := func(addresses [][]v1.NodeAddress) { p.workerPeersAddresses = addresses }
setterFunc := func(addresses []string) { p.workerPeersAddresses = addresses }
selectorGetter := func() labels.Selector { return p.workerPeerSelector }
p.updatePeers(ctx, selectorGetter, setterFunc)
}

func (p *Peers) updateControlPlanePeers(ctx context.Context) {
setterFunc := func(addresses [][]v1.NodeAddress) { p.controlPlanePeersAddresses = addresses }
setterFunc := func(addresses []string) { p.controlPlanePeersAddresses = addresses }
selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector }
p.updatePeers(ctx, selectorGetter, setterFunc)
}

func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses [][]v1.NodeAddress)) {
func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []string)) {
p.mutex.Lock()
defer p.mutex.Unlock()

Expand All @@ -111,37 +111,49 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec
if err := p.List(readerCtx, &nodes, client.MatchingLabelsSelector{Selector: getSelector()}); err != nil {
if errors.IsNotFound(err) {
// we are the only node at the moment... reset peerList
p.workerPeersAddresses = [][]v1.NodeAddress{}
p.workerPeersAddresses = []string{}
}
p.log.Error(err, "failed to update peer list")
return
}

pods := v1.PodList{}
listOptions := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
"app.kubernetes.io/name": "self-node-remediation",
"app.kubernetes.io/component": "agent",
}),
}
if err := p.List(readerCtx, &pods, listOptions); err != nil {
p.log.Error(err, "could not get pods")
}

nodesCount := len(nodes.Items)
addresses := make([][]v1.NodeAddress, nodesCount)
addresses := make([]string, nodesCount)
for i, node := range nodes.Items {
addresses[i] = node.Status.Addresses
for _, pod := range pods.Items {
if pod.Spec.NodeName == node.Name {
addresses[i] = pod.Status.PodIP
}
}
}
setAddresses(addresses)
}

func (p *Peers) GetPeersAddresses(role Role) [][]v1.NodeAddress {
func (p *Peers) GetPeersAddresses(role Role) []string {
p.mutex.Lock()
defer p.mutex.Unlock()

var addresses [][]v1.NodeAddress
var addresses []string
if role == Worker {
addresses = p.workerPeersAddresses
} else {
addresses = p.controlPlanePeersAddresses
}
//we don't want the caller to be able to change the addresses
//so we create a deep copy and return it
addressesCopy := make([][]v1.NodeAddress, len(addresses))
for i := range addressesCopy {
addressesCopy[i] = make([]v1.NodeAddress, len(addresses[i]))
copy(addressesCopy, addresses)
}
addressesCopy := make([]string, len(addresses))
copy(addressesCopy, addresses)

return addressesCopy
}
Expand Down

0 comments on commit a71be9c

Please sign in to comment.