Skip to content

Commit

Permalink
Return error if cannot update peers address
Browse files Browse the repository at this point in the history
At startup (but it might happen in other moments too), some peers' Pod
IP can still be empty, which means that until the next peers update we
cannot check the connection with the other peers.

Return an error in case a peer's Pod IP is empty.

Signed-off-by: Carlo Lobrano <[email protected]>
  • Loading branch information
clobrano authored and openshift-cherrypick-robot committed Jul 9, 2024
1 parent e5f9432 commit 71a7ab8
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions pkg/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-logr/logr"
commonlabels "github.com/medik8s/common/pkg/labels"
pkgerrors "github.com/pkg/errors"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -76,30 +77,37 @@ func (p *Peers) Start(ctx context.Context) error {
p.controlPlanePeerSelector = createSelector(hostname, getControlPlaneLabel(myNode))
}

go wait.UntilWithContext(ctx, func(ctx context.Context) {
p.updateWorkerPeers(ctx)
p.updateControlPlanePeers(ctx)
}, p.peerUpdateInterval)
var updatePeersError error
cancellableCtx, cancel := context.WithCancel(ctx)

p.log.Info("peers started")
p.log.Info("peer starting", "name", p.myNodeName)
wait.UntilWithContext(cancellableCtx, func(ctx context.Context) {
updatePeersError = p.updateWorkerPeers(ctx)
if updatePeersError != nil {
cancel()
}
updatePeersError = p.updateControlPlanePeers(ctx)
if updatePeersError != nil {
cancel()
}
}, p.peerUpdateInterval)

<-ctx.Done()
return nil
return updatePeersError
}

func (p *Peers) updateWorkerPeers(ctx context.Context) {
func (p *Peers) updateWorkerPeers(ctx context.Context) error {
setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses }
selectorGetter := func() labels.Selector { return p.workerPeerSelector }
p.updatePeers(ctx, selectorGetter, setterFunc)
return p.updatePeers(ctx, selectorGetter, setterFunc)
}

func (p *Peers) updateControlPlanePeers(ctx context.Context) {
func (p *Peers) updateControlPlanePeers(ctx context.Context) error {
setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses }
selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector }
p.updatePeers(ctx, selectorGetter, setterFunc)
return p.updatePeers(ctx, selectorGetter, setterFunc)
}

func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) {
func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) error {
p.mutex.Lock()
defer p.mutex.Unlock()

Expand All @@ -114,7 +122,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec
p.workerPeersAddresses = []v1.PodIP{}
}
p.log.Error(err, "failed to update peer list")
return
return pkgerrors.Wrap(err, "failed to update peer list")
}

pods := v1.PodList{}
Expand All @@ -126,6 +134,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec
}
if err := p.List(readerCtx, &pods, listOptions); err != nil {
p.log.Error(err, "could not get pods")
return pkgerrors.Wrap(err, "could not get pods")
}

nodesCount := len(nodes.Items)
Expand All @@ -134,14 +143,14 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec
for _, pod := range pods.Items {
if pod.Spec.NodeName == node.Name {
if pod.Status.PodIPs == nil || len(pod.Status.PodIPs) == 0 {
p.log.Info("skipping empty Pod IPs", "node", node.Name, "Pod", pod.Name)
continue
return pkgerrors.New(fmt.Sprintf("empty Pod IP for Pod %s on Node %s", pod.Name, node.Name))
}
addresses[i] = pod.Status.PodIPs[0]
}
}
}
setAddresses(addresses)
return nil
}

func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP {
Expand Down

0 comments on commit 71a7ab8

Please sign in to comment.