From 71a7ab8759d790efcdac39c541b8e78d1ec9b33e Mon Sep 17 00:00:00 2001 From: Carlo Lobrano Date: Wed, 3 Jul 2024 11:07:45 +0200 Subject: [PATCH] Return error if cannot update peers address At startup (but it might happen in other moments too), some peers' Pod IP can still be empty, which means that until the next peers update we cannot check the connection with the other peers. Return an error in case a peer's Pod IP is empty. Signed-off-by: Carlo Lobrano --- pkg/peers/peers.go | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/pkg/peers/peers.go b/pkg/peers/peers.go index cd741d6d..3978af5d 100644 --- a/pkg/peers/peers.go +++ b/pkg/peers/peers.go @@ -8,6 +8,7 @@ import ( "github.com/go-logr/logr" commonlabels "github.com/medik8s/common/pkg/labels" + pkgerrors "github.com/pkg/errors" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -76,30 +77,37 @@ func (p *Peers) Start(ctx context.Context) error { p.controlPlanePeerSelector = createSelector(hostname, getControlPlaneLabel(myNode)) } - go wait.UntilWithContext(ctx, func(ctx context.Context) { - p.updateWorkerPeers(ctx) - p.updateControlPlanePeers(ctx) - }, p.peerUpdateInterval) + var updatePeersError error + cancellableCtx, cancel := context.WithCancel(ctx) - p.log.Info("peers started") + p.log.Info("peer starting", "name", p.myNodeName) + wait.UntilWithContext(cancellableCtx, func(ctx context.Context) { + updatePeersError = p.updateWorkerPeers(ctx) + if updatePeersError != nil { + cancel() + } + updatePeersError = p.updateControlPlanePeers(ctx) + if updatePeersError != nil { + cancel() + } + }, p.peerUpdateInterval) - <-ctx.Done() - return nil + return updatePeersError } -func (p *Peers) updateWorkerPeers(ctx context.Context) { +func (p *Peers) updateWorkerPeers(ctx context.Context) error { setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.workerPeerSelector } - p.updatePeers(ctx, selectorGetter, setterFunc) + return p.updatePeers(ctx, selectorGetter, setterFunc) } -func (p *Peers) updateControlPlanePeers(ctx context.Context) { +func (p *Peers) updateControlPlanePeers(ctx context.Context) error { setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector } - p.updatePeers(ctx, selectorGetter, setterFunc) + return p.updatePeers(ctx, selectorGetter, setterFunc) } -func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) { +func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) error { p.mutex.Lock() defer p.mutex.Unlock() @@ -114,7 +122,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec p.workerPeersAddresses = []v1.PodIP{} } p.log.Error(err, "failed to update peer list") - return + return pkgerrors.Wrap(err, "failed to update peer list") } pods := v1.PodList{} @@ -126,6 +134,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec } if err := p.List(readerCtx, &pods, listOptions); err != nil { p.log.Error(err, "could not get pods") + return pkgerrors.Wrap(err, "could not get pods") } nodesCount := len(nodes.Items) @@ -134,14 +143,14 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec for _, pod := range pods.Items { if pod.Spec.NodeName == node.Name { if pod.Status.PodIPs == nil || len(pod.Status.PodIPs) == 0 { - p.log.Info("skipping empty Pod IPs", "node", node.Name, "Pod", pod.Name) - continue + return pkgerrors.New(fmt.Sprintf("empty Pod IP for Pod %s on Node %s", pod.Name, node.Name)) } addresses[i] = pod.Status.PodIPs[0] } } } setAddresses(addresses) + return nil } func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP {