From 3eeeaebaa3661e4125bbc5568cadd87818fc0a1c Mon Sep 17 00:00:00 2001 From: Carlo Lobrano Date: Wed, 12 Jun 2024 13:28:00 +0200 Subject: [PATCH 1/4] Use Pod IP for peer communication Signed-off-by: Carlo Lobrano --- pkg/apicheck/check.go | 10 +++++----- pkg/peers/peers.go | 44 +++++++++++++++++++++++++++---------------- 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/pkg/apicheck/check.go b/pkg/apicheck/check.go index 0a5d87c37..f1aa46bf9 100644 --- a/pkg/apicheck/check.go +++ b/pkg/apicheck/check.go @@ -10,7 +10,6 @@ import ( "github.com/go-logr/logr" "google.golang.org/grpc/credentials" - v1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" @@ -199,7 +198,7 @@ func (c *ApiConnectivityCheck) canOtherControlPlanesBeReached() bool { return (healthyResponses + unhealthyResponses + apiErrorsResponses) > 0 } -func (c *ApiConnectivityCheck) popNodes(nodes *[][]v1.NodeAddress, count int) []string { +func (c *ApiConnectivityCheck) popNodes(nodes *[]string, count int) []string { nrOfNodes := len(*nodes) if nrOfNodes == 0 { return []string{} @@ -212,12 +211,13 @@ func (c *ApiConnectivityCheck) popNodes(nodes *[][]v1.NodeAddress, count int) [] //todo maybe we should pick nodes randomly rather than relying on the order returned from api-server addresses := make([]string, count) for i := 0; i < count; i++ { - nodeAddresses := (*nodes)[i] - if len(nodeAddresses) == 0 || nodeAddresses[0].Address == "" { + // TODO: shall we need to get "count" addresses anyway, replacing empty IP with another Node? + address := (*nodes)[i] + if address == "" { c.config.Log.Info("ignoring node without IP address") continue } - addresses[i] = nodeAddresses[0].Address //todo node might have multiple addresses or none + addresses[i] = address } *nodes = (*nodes)[count:] //remove popped nodes from the list diff --git a/pkg/peers/peers.go b/pkg/peers/peers.go index ce28dc02b..540be0bf6 100644 --- a/pkg/peers/peers.go +++ b/pkg/peers/peers.go @@ -36,7 +36,7 @@ type Peers struct { myNodeName string mutex sync.Mutex apiServerTimeout time.Duration - workerPeersAddresses, controlPlanePeersAddresses [][]v1.NodeAddress + workerPeersAddresses, controlPlanePeersAddresses []string } func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Reader, log logr.Logger, apiServerTimeout time.Duration) *Peers { @@ -47,8 +47,8 @@ func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Read myNodeName: myNodeName, mutex: sync.Mutex{}, apiServerTimeout: apiServerTimeout, - workerPeersAddresses: [][]v1.NodeAddress{}, - controlPlanePeersAddresses: [][]v1.NodeAddress{}, + workerPeersAddresses: []string{}, + controlPlanePeersAddresses: []string{}, } } @@ -88,18 +88,18 @@ func (p *Peers) Start(ctx context.Context) error { } func (p *Peers) updateWorkerPeers(ctx context.Context) { - setterFunc := func(addresses [][]v1.NodeAddress) { p.workerPeersAddresses = addresses } + setterFunc := func(addresses []string) { p.workerPeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.workerPeerSelector } p.updatePeers(ctx, selectorGetter, setterFunc) } func (p *Peers) updateControlPlanePeers(ctx context.Context) { - setterFunc := func(addresses [][]v1.NodeAddress) { p.controlPlanePeersAddresses = addresses } + setterFunc := func(addresses []string) { p.controlPlanePeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector } p.updatePeers(ctx, selectorGetter, setterFunc) } -func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses [][]v1.NodeAddress)) { +func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []string)) { p.mutex.Lock() defer p.mutex.Unlock() @@ -111,25 +111,40 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec if err := p.List(readerCtx, &nodes, client.MatchingLabelsSelector{Selector: getSelector()}); err != nil { if errors.IsNotFound(err) { // we are the only node at the moment... reset peerList - p.workerPeersAddresses = [][]v1.NodeAddress{} + p.workerPeersAddresses = []string{} } p.log.Error(err, "failed to update peer list") return } + pods := v1.PodList{} + listOptions := &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{ + "app.kubernetes.io/name": "self-node-remediation", + "app.kubernetes.io/component": "agent", + }), + } + if err := p.List(readerCtx, &pods, listOptions); err != nil { + p.log.Error(err, "could not get pods") + } + nodesCount := len(nodes.Items) - addresses := make([][]v1.NodeAddress, nodesCount) + addresses := make([]string, nodesCount) for i, node := range nodes.Items { - addresses[i] = node.Status.Addresses + for _, pod := range pods.Items { + if pod.Spec.NodeName == node.Name { + addresses[i] = pod.Status.PodIP + } + } } setAddresses(addresses) } -func (p *Peers) GetPeersAddresses(role Role) [][]v1.NodeAddress { +func (p *Peers) GetPeersAddresses(role Role) []string { p.mutex.Lock() defer p.mutex.Unlock() - var addresses [][]v1.NodeAddress + var addresses []string if role == Worker { addresses = p.workerPeersAddresses } else { @@ -137,11 +152,8 @@ func (p *Peers) GetPeersAddresses(role Role) [][]v1.NodeAddress { } //we don't want the caller to be able to change the addresses //so we create a deep copy and return it - addressesCopy := make([][]v1.NodeAddress, len(addresses)) - for i := range addressesCopy { - addressesCopy[i] = make([]v1.NodeAddress, len(addresses[i])) - copy(addressesCopy, addresses) - } + addressesCopy := make([]string, len(addresses)) + copy(addressesCopy, addresses) return addressesCopy } From 54086ffb23aeaf39c7ac4792150e5bd3b6a9bea3 Mon Sep 17 00:00:00 2001 From: Carlo Lobrano Date: Mon, 17 Jun 2024 14:40:41 +0200 Subject: [PATCH 2/4] Update terminology to reflect Pod IP usage in place of Node IP Signed-off-by: Carlo Lobrano --- pkg/apicheck/check.go | 60 +++++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/pkg/apicheck/check.go b/pkg/apicheck/check.go index f1aa46bf9..e8e8713a4 100644 --- a/pkg/apicheck/check.go +++ b/pkg/apicheck/check.go @@ -127,21 +127,21 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response { } c.config.Log.Info("Error count exceeds threshold, trying to ask other nodes if I'm healthy") - nodesToAsk := c.config.Peers.GetPeersAddresses(peers.Worker) - if nodesToAsk == nil || len(nodesToAsk) == 0 { + peersToAsk := c.config.Peers.GetPeersAddresses(peers.Worker) + if peersToAsk == nil || len(peersToAsk) == 0 { c.config.Log.Info("Peers list is empty and / or couldn't be retrieved from server, nothing we can do, so consider the node being healthy") - //todo maybe we need to check if this happens too much and reboot + // TODO: maybe we need to check if this happens too much and reboot return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseNoPeersWereFound} } apiErrorsResponsesSum := 0 - nrAllNodes := len(nodesToAsk) - // nodesToAsk is being reduced in every iteration, iterate until no nodes left to ask - for i := 0; len(nodesToAsk) > 0; i++ { + nrAllPeers := len(peersToAsk) + // peersToAsk is being reduced at every iteration, iterate until no peers left to ask + for i := 0; len(peersToAsk) > 0; i++ { - batchSize := utils.GetNextBatchSize(nrAllNodes, len(nodesToAsk)) - chosenNodesAddresses := c.popNodes(&nodesToAsk, batchSize) - healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenNodesAddresses) + batchSize := utils.GetNextBatchSize(nrAllPeers, len(peersToAsk)) + chosenPeersIPs := c.popPeerIPs(&peersToAsk, batchSize) + healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenPeersIPs) if healthyResponses+unhealthyResponses+apiErrorsResponses > 0 { c.timeOfLastPeerResponse = time.Now() } @@ -160,9 +160,9 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response { if apiErrorsResponses > 0 { c.config.Log.Info("Peer can't access the api-server") apiErrorsResponsesSum += apiErrorsResponses - //todo consider using [m|n]hc.spec.maxUnhealthy instead of 50% - if apiErrorsResponsesSum > nrAllNodes/2 { //already reached more than 50% of the nodes and all of them returned api error - //assuming this is a control plane failure as others can't access api-server as well + // TODO: consider using [m|n]hc.spec.maxUnhealthy instead of 50% + if apiErrorsResponsesSum > nrAllPeers/2 { // already reached more than 50% of the peers and all of them returned api error + // assuming this is a control plane failure as others can't access api-server as well c.config.Log.Info("More than 50% of the nodes couldn't access the api-server, assuming this is a control plane failure") return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseMostPeersCantAccessAPIServer} } @@ -184,45 +184,45 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response { } func (c *ApiConnectivityCheck) canOtherControlPlanesBeReached() bool { - nodesToAsk := c.config.Peers.GetPeersAddresses(peers.ControlPlane) - numOfControlPlanePeers := len(nodesToAsk) + peersToAsk := c.config.Peers.GetPeersAddresses(peers.ControlPlane) + numOfControlPlanePeers := len(peersToAsk) if numOfControlPlanePeers == 0 { c.config.Log.Info("Peers list is empty and / or couldn't be retrieved from server, other control planes can't be reached") return false } - chosenNodesAddresses := c.popNodes(&nodesToAsk, numOfControlPlanePeers) - healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenNodesAddresses) + chosenPeersIPs := c.popPeerIPs(&peersToAsk, numOfControlPlanePeers) + healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenPeersIPs) // Any response is an indication of communication with a peer return (healthyResponses + unhealthyResponses + apiErrorsResponses) > 0 } -func (c *ApiConnectivityCheck) popNodes(nodes *[]string, count int) []string { - nrOfNodes := len(*nodes) - if nrOfNodes == 0 { +func (c *ApiConnectivityCheck) popPeerIPs(peersIPs *[]string, count int) []string { + nrOfPeers := len(*peersIPs) + if nrOfPeers == 0 { return []string{} } - if count > nrOfNodes { - count = nrOfNodes + if count > nrOfPeers { + count = nrOfPeers } - //todo maybe we should pick nodes randomly rather than relying on the order returned from api-server - addresses := make([]string, count) + // TODO: maybe we should pick nodes randomly rather than relying on the order returned from api-server + selectedIPs := make([]string, count) for i := 0; i < count; i++ { - // TODO: shall we need to get "count" addresses anyway, replacing empty IP with another Node? - address := (*nodes)[i] - if address == "" { - c.config.Log.Info("ignoring node without IP address") + ip := (*peersIPs)[i] + if ip == "" { + // This should not happen, but keeping it for good measure. + c.config.Log.Info("ignoring peers without IP address") continue } - addresses[i] = address + selectedIPs[i] = ip } - *nodes = (*nodes)[count:] //remove popped nodes from the list + *peersIPs = (*peersIPs)[count:] //remove popped nodes from the list - return addresses + return selectedIPs } func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []string) (int, int, int, int) { From 99a70db528e8e5c15d302d5049f2d93c90925015 Mon Sep 17 00:00:00 2001 From: Carlo Lobrano Date: Mon, 24 Jun 2024 19:39:11 +0200 Subject: [PATCH 3/4] Use core/v1 PodIP type in place than string Signed-off-by: Carlo Lobrano --- pkg/apicheck/check.go | 17 +++++++++-------- pkg/peers/peers.go | 28 ++++++++++++++++------------ 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/pkg/apicheck/check.go b/pkg/apicheck/check.go index e8e8713a4..fda7e9019 100644 --- a/pkg/apicheck/check.go +++ b/pkg/apicheck/check.go @@ -10,6 +10,7 @@ import ( "github.com/go-logr/logr" "google.golang.org/grpc/credentials" + corev1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" @@ -198,10 +199,10 @@ func (c *ApiConnectivityCheck) canOtherControlPlanesBeReached() bool { return (healthyResponses + unhealthyResponses + apiErrorsResponses) > 0 } -func (c *ApiConnectivityCheck) popPeerIPs(peersIPs *[]string, count int) []string { +func (c *ApiConnectivityCheck) popPeerIPs(peersIPs *[]corev1.PodIP, count int) []corev1.PodIP { nrOfPeers := len(*peersIPs) if nrOfPeers == 0 { - return []string{} + return []corev1.PodIP{} } if count > nrOfPeers { @@ -209,10 +210,10 @@ func (c *ApiConnectivityCheck) popPeerIPs(peersIPs *[]string, count int) []strin } // TODO: maybe we should pick nodes randomly rather than relying on the order returned from api-server - selectedIPs := make([]string, count) + selectedIPs := make([]corev1.PodIP, count) for i := 0; i < count; i++ { ip := (*peersIPs)[i] - if ip == "" { + if ip.IP == "" { // This should not happen, but keeping it for good measure. c.config.Log.Info("ignoring peers without IP address") continue @@ -225,7 +226,7 @@ func (c *ApiConnectivityCheck) popPeerIPs(peersIPs *[]string, count int) []strin return selectedIPs } -func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []string) (int, int, int, int) { +func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []corev1.PodIP) (int, int, int, int) { nrAddresses := len(addresses) responsesChan := make(chan selfNodeRemediation.HealthCheckResponseCode, nrAddresses) @@ -237,9 +238,9 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []string) (int } // getHealthStatusFromPeer issues a GET request to the specified IP and returns the result from the peer into the given channel -func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, results chan<- selfNodeRemediation.HealthCheckResponseCode) { +func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) { - logger := c.config.Log.WithValues("IP", endpointIp) + logger := c.config.Log.WithValues("IP", endpointIp.IP) logger.Info("getting health status from peer") if err := c.initClientCreds(); err != nil { @@ -249,7 +250,7 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, result } // TODO does this work with IPv6? - phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds) + phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp.IP, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds) if err != nil { logger.Error(err, "failed to init grpc client") results <- selfNodeRemediation.RequestFailed diff --git a/pkg/peers/peers.go b/pkg/peers/peers.go index 540be0bf6..cd741d6df 100644 --- a/pkg/peers/peers.go +++ b/pkg/peers/peers.go @@ -36,7 +36,7 @@ type Peers struct { myNodeName string mutex sync.Mutex apiServerTimeout time.Duration - workerPeersAddresses, controlPlanePeersAddresses []string + workerPeersAddresses, controlPlanePeersAddresses []v1.PodIP } func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Reader, log logr.Logger, apiServerTimeout time.Duration) *Peers { @@ -47,8 +47,8 @@ func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Read myNodeName: myNodeName, mutex: sync.Mutex{}, apiServerTimeout: apiServerTimeout, - workerPeersAddresses: []string{}, - controlPlanePeersAddresses: []string{}, + workerPeersAddresses: []v1.PodIP{}, + controlPlanePeersAddresses: []v1.PodIP{}, } } @@ -88,18 +88,18 @@ func (p *Peers) Start(ctx context.Context) error { } func (p *Peers) updateWorkerPeers(ctx context.Context) { - setterFunc := func(addresses []string) { p.workerPeersAddresses = addresses } + setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.workerPeerSelector } p.updatePeers(ctx, selectorGetter, setterFunc) } func (p *Peers) updateControlPlanePeers(ctx context.Context) { - setterFunc := func(addresses []string) { p.controlPlanePeersAddresses = addresses } + setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector } p.updatePeers(ctx, selectorGetter, setterFunc) } -func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []string)) { +func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) { p.mutex.Lock() defer p.mutex.Unlock() @@ -111,7 +111,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec if err := p.List(readerCtx, &nodes, client.MatchingLabelsSelector{Selector: getSelector()}); err != nil { if errors.IsNotFound(err) { // we are the only node at the moment... reset peerList - p.workerPeersAddresses = []string{} + p.workerPeersAddresses = []v1.PodIP{} } p.log.Error(err, "failed to update peer list") return @@ -129,22 +129,26 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec } nodesCount := len(nodes.Items) - addresses := make([]string, nodesCount) + addresses := make([]v1.PodIP, nodesCount) for i, node := range nodes.Items { for _, pod := range pods.Items { if pod.Spec.NodeName == node.Name { - addresses[i] = pod.Status.PodIP + if pod.Status.PodIPs == nil || len(pod.Status.PodIPs) == 0 { + p.log.Info("skipping empty Pod IPs", "node", node.Name, "Pod", pod.Name) + continue + } + addresses[i] = pod.Status.PodIPs[0] } } } setAddresses(addresses) } -func (p *Peers) GetPeersAddresses(role Role) []string { +func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP { p.mutex.Lock() defer p.mutex.Unlock() - var addresses []string + var addresses []v1.PodIP if role == Worker { addresses = p.workerPeersAddresses } else { @@ -152,7 +156,7 @@ func (p *Peers) GetPeersAddresses(role Role) []string { } //we don't want the caller to be able to change the addresses //so we create a deep copy and return it - addressesCopy := make([]string, len(addresses)) + addressesCopy := make([]v1.PodIP, len(addresses)) copy(addressesCopy, addresses) return addressesCopy From cda2f3f9a5b93e592a88eb6b1fc7b0c85a250623 Mon Sep 17 00:00:00 2001 From: Carlo Lobrano Date: Wed, 3 Jul 2024 11:07:45 +0200 Subject: [PATCH 4/4] Return error if cannot update peers address At startup (but it might happen in other moments too), some peers' Pod IP can still be empty, which means that until the next peers update we cannot check the connection with the other peers. Return an error in case a peer's Pod IP is empty. Signed-off-by: Carlo Lobrano --- pkg/peers/peers.go | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/pkg/peers/peers.go b/pkg/peers/peers.go index cd741d6df..3978af5de 100644 --- a/pkg/peers/peers.go +++ b/pkg/peers/peers.go @@ -8,6 +8,7 @@ import ( "github.com/go-logr/logr" commonlabels "github.com/medik8s/common/pkg/labels" + pkgerrors "github.com/pkg/errors" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -76,30 +77,37 @@ func (p *Peers) Start(ctx context.Context) error { p.controlPlanePeerSelector = createSelector(hostname, getControlPlaneLabel(myNode)) } - go wait.UntilWithContext(ctx, func(ctx context.Context) { - p.updateWorkerPeers(ctx) - p.updateControlPlanePeers(ctx) - }, p.peerUpdateInterval) + var updatePeersError error + cancellableCtx, cancel := context.WithCancel(ctx) - p.log.Info("peers started") + p.log.Info("peer starting", "name", p.myNodeName) + wait.UntilWithContext(cancellableCtx, func(ctx context.Context) { + updatePeersError = p.updateWorkerPeers(ctx) + if updatePeersError != nil { + cancel() + } + updatePeersError = p.updateControlPlanePeers(ctx) + if updatePeersError != nil { + cancel() + } + }, p.peerUpdateInterval) - <-ctx.Done() - return nil + return updatePeersError } -func (p *Peers) updateWorkerPeers(ctx context.Context) { +func (p *Peers) updateWorkerPeers(ctx context.Context) error { setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.workerPeerSelector } - p.updatePeers(ctx, selectorGetter, setterFunc) + return p.updatePeers(ctx, selectorGetter, setterFunc) } -func (p *Peers) updateControlPlanePeers(ctx context.Context) { +func (p *Peers) updateControlPlanePeers(ctx context.Context) error { setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector } - p.updatePeers(ctx, selectorGetter, setterFunc) + return p.updatePeers(ctx, selectorGetter, setterFunc) } -func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) { +func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) error { p.mutex.Lock() defer p.mutex.Unlock() @@ -114,7 +122,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec p.workerPeersAddresses = []v1.PodIP{} } p.log.Error(err, "failed to update peer list") - return + return pkgerrors.Wrap(err, "failed to update peer list") } pods := v1.PodList{} @@ -126,6 +134,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec } if err := p.List(readerCtx, &pods, listOptions); err != nil { p.log.Error(err, "could not get pods") + return pkgerrors.Wrap(err, "could not get pods") } nodesCount := len(nodes.Items) @@ -134,14 +143,14 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec for _, pod := range pods.Items { if pod.Spec.NodeName == node.Name { if pod.Status.PodIPs == nil || len(pod.Status.PodIPs) == 0 { - p.log.Info("skipping empty Pod IPs", "node", node.Name, "Pod", pod.Name) - continue + return pkgerrors.New(fmt.Sprintf("empty Pod IP for Pod %s on Node %s", pod.Name, node.Name)) } addresses[i] = pod.Status.PodIPs[0] } } } setAddresses(addresses) + return nil } func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP {