diff --git a/pkg/peers/peers.go b/pkg/peers/peers.go index cd741d6d..3978af5d 100644 --- a/pkg/peers/peers.go +++ b/pkg/peers/peers.go @@ -8,6 +8,7 @@ import ( "github.com/go-logr/logr" commonlabels "github.com/medik8s/common/pkg/labels" + pkgerrors "github.com/pkg/errors" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -76,30 +77,37 @@ func (p *Peers) Start(ctx context.Context) error { p.controlPlanePeerSelector = createSelector(hostname, getControlPlaneLabel(myNode)) } - go wait.UntilWithContext(ctx, func(ctx context.Context) { - p.updateWorkerPeers(ctx) - p.updateControlPlanePeers(ctx) - }, p.peerUpdateInterval) + var updatePeersError error + cancellableCtx, cancel := context.WithCancel(ctx) - p.log.Info("peers started") + p.log.Info("peer starting", "name", p.myNodeName) + wait.UntilWithContext(cancellableCtx, func(ctx context.Context) { + updatePeersError = p.updateWorkerPeers(ctx) + if updatePeersError != nil { + cancel() + } + updatePeersError = p.updateControlPlanePeers(ctx) + if updatePeersError != nil { + cancel() + } + }, p.peerUpdateInterval) - <-ctx.Done() - return nil + return updatePeersError } -func (p *Peers) updateWorkerPeers(ctx context.Context) { +func (p *Peers) updateWorkerPeers(ctx context.Context) error { setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.workerPeerSelector } - p.updatePeers(ctx, selectorGetter, setterFunc) + return p.updatePeers(ctx, selectorGetter, setterFunc) } -func (p *Peers) updateControlPlanePeers(ctx context.Context) { +func (p *Peers) updateControlPlanePeers(ctx context.Context) error { setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector } - p.updatePeers(ctx, selectorGetter, setterFunc) + return p.updatePeers(ctx, selectorGetter, setterFunc) } -func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) { +func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) error { p.mutex.Lock() defer p.mutex.Unlock() @@ -114,7 +122,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec p.workerPeersAddresses = []v1.PodIP{} } p.log.Error(err, "failed to update peer list") - return + return pkgerrors.Wrap(err, "failed to update peer list") } pods := v1.PodList{} @@ -126,6 +134,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec } if err := p.List(readerCtx, &pods, listOptions); err != nil { p.log.Error(err, "could not get pods") + return pkgerrors.Wrap(err, "could not get pods") } nodesCount := len(nodes.Items) @@ -134,14 +143,14 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec for _, pod := range pods.Items { if pod.Spec.NodeName == node.Name { if pod.Status.PodIPs == nil || len(pod.Status.PodIPs) == 0 { - p.log.Info("skipping empty Pod IPs", "node", node.Name, "Pod", pod.Name) - continue + return pkgerrors.New(fmt.Sprintf("empty Pod IP for Pod %s on Node %s", pod.Name, node.Name)) } addresses[i] = pod.Status.PodIPs[0] } } } setAddresses(addresses) + return nil } func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP {