Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Community PR 3874 Test Run #3973

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
3 changes: 3 additions & 0 deletions .changelog/3874.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:sync-catalog
Ignore Endpoints with terminating or non-ready condition
```
212 changes: 111 additions & 101 deletions control-plane/catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,55 +622,38 @@ func (t *ServiceResource) generateRegistrations(key string) {

for _, endpointSlice := range endpointSliceList {
for _, endpoint := range endpointSlice.Endpoints {
// Check that the node name exists
// subsetAddr.NodeName is of type *string
if endpoint.NodeName == nil {
continue
}
// Look up the node's ip address by getting node info
node, err := t.Client.CoreV1().Nodes().Get(t.Ctx, *endpoint.NodeName, metav1.GetOptions{})
if err != nil {
t.Log.Error("error getting node info", "error", err)
continue
}

// Set the expected node address type
var expectedType corev1.NodeAddressType
if t.NodePortSync == InternalOnly {
expectedType = corev1.NodeInternalIP
} else {
expectedType = corev1.NodeExternalIP
}

for _, endpointAddr := range endpoint.Addresses {
// Only consider endpoints that are ready
// nil represents an unknown state that can be interpreted as a non-terminating endpoint (assume ready or at least a state we shouldn't ignore)
// Ref: https://github.com/kubernetes/api/blob/5147c1a32f6a0b9b155bb84e59f933e0ff8a3792/discovery/v1/types.go#L128-L151
if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready {
// Check that the node name exists
// subsetAddr.NodeName is of type *string
if endpoint.NodeName == nil {
continue
}
// Look up the node's ip address by getting node info
node, err := t.Client.CoreV1().Nodes().Get(t.Ctx, *endpoint.NodeName, metav1.GetOptions{})
if err != nil {
t.Log.Error("error getting node info", "error", err)
continue
}

// Find the ip address for the node and
// create the Consul service using it
var found bool
for _, address := range node.Status.Addresses {
if address.Type == expectedType {
found = true
r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, endpointAddr)
r.Service.Address = address.Address

t.consulMap[key] = append(t.consulMap[key], &r)
// Only consider the first address that matches. In some cases
// there will be multiple addresses like when using AWS CNI.
// In those cases, Kubernetes will ensure eth0 is always the first
// address in the list.
// See https://github.com/kubernetes/kubernetes/blob/b559434c02f903dbcd46ee7d6c78b216d3f0aca0/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go#L1462-L1464
break
}
// Set the expected node address type
var expectedType corev1.NodeAddressType
if t.NodePortSync == InternalOnly {
expectedType = corev1.NodeInternalIP
} else {
expectedType = corev1.NodeExternalIP
}

// If an ExternalIP wasn't found, and ExternalFirst is set,
// use an InternalIP
if t.NodePortSync == ExternalFirst && !found {
for _, endpointAddr := range endpoint.Addresses {

// Find the ip address for the node and
// create the Consul service using it
var found bool
for _, address := range node.Status.Addresses {
if address.Type == corev1.NodeInternalIP {
if address.Type == expectedType {
found = true
r := baseNode
rs := baseService
r.Service = &rs
Expand All @@ -686,8 +669,30 @@ func (t *ServiceResource) generateRegistrations(key string) {
break
}
}
}

// If an ExternalIP wasn't found, and ExternalFirst is set,
// use an InternalIP
if t.NodePortSync == ExternalFirst && !found {
for _, address := range node.Status.Addresses {
if address.Type == corev1.NodeInternalIP {
r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, endpointAddr)
r.Service.Address = address.Address

t.consulMap[key] = append(t.consulMap[key], &r)
// Only consider the first address that matches. In some cases
// there will be multiple addresses like when using AWS CNI.
// In those cases, Kubernetes will ensure eth0 is always the first
// address in the list.
// See https://github.com/kubernetes/kubernetes/blob/b559434c02f903dbcd46ee7d6c78b216d3f0aca0/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go#L1462-L1464
break
}
}
}

}
}
}
}
Expand Down Expand Up @@ -739,67 +744,72 @@ func (t *ServiceResource) registerServiceInstance(
}
}
for _, endpoint := range endpointSlice.Endpoints {
for _, endpointAddr := range endpoint.Addresses {

var addr string
// Use the address and port from the Ingress resource if
// ingress-sync is enabled and the service has an ingress
// resource that references it.
if t.EnableIngress && t.isIngressService(key) {
addr = t.serviceHostnameMap[key].hostName
epPort = int(t.serviceHostnameMap[key].port)
} else {
addr = endpointAddr
if addr == "" && useHostname {
addr = *endpoint.Hostname
// Only consider endpoints that are ready
// nil represents an unknown state that can be interpreted as a non-terminating endpoint (assume ready or at least a state we shouldn't ignore)
// Ref: https://github.com/kubernetes/api/blob/5147c1a32f6a0b9b155bb84e59f933e0ff8a3792/discovery/v1/types.go#L128-L151
if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready {
for _, endpointAddr := range endpoint.Addresses {

var addr string
// Use the address and port from the Ingress resource if
// ingress-sync is enabled and the service has an ingress
// resource that references it.
if t.EnableIngress && t.isIngressService(key) {
addr = t.serviceHostnameMap[key].hostName
epPort = int(t.serviceHostnameMap[key].port)
} else {
addr = endpointAddr
if addr == "" && useHostname {
addr = *endpoint.Hostname
}
if addr == "" {
continue
}
}
if addr == "" {

// Its not clear whether K8S guarantees ready addresses to
// be unique so we maintain a set to prevent duplicates just
// in case.
if _, ok := seen[addr]; ok {
continue
}
}

// Its not clear whether K8S guarantees ready addresses to
// be unique so we maintain a set to prevent duplicates just
// in case.
if _, ok := seen[addr]; ok {
continue
}
seen[addr] = struct{}{}
seen[addr] = struct{}{}

r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr
r.Service.Port = epPort
r.Service.Meta = make(map[string]string)
// Deepcopy baseService.Meta into r.Service.Meta as baseService is shared
// between all nodes of a service
for k, v := range baseService.Meta {
r.Service.Meta[k] = v
}
if endpoint.TargetRef != nil {
r.Service.Meta[ConsulK8SRefValue] = endpoint.TargetRef.Name
r.Service.Meta[ConsulK8SRefKind] = endpoint.TargetRef.Kind
}
if endpoint.NodeName != nil {
r.Service.Meta[ConsulK8SNodeName] = *endpoint.NodeName
}
if endpoint.Zone != nil {
r.Service.Meta[ConsulK8STopologyZone] = *endpoint.Zone
}

r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr
r.Service.Port = epPort
r.Service.Meta = make(map[string]string)
// Deepcopy baseService.Meta into r.Service.Meta as baseService is shared
// between all nodes of a service
for k, v := range baseService.Meta {
r.Service.Meta[k] = v
}
if endpoint.TargetRef != nil {
r.Service.Meta[ConsulK8SRefValue] = endpoint.TargetRef.Name
r.Service.Meta[ConsulK8SRefKind] = endpoint.TargetRef.Kind
}
if endpoint.NodeName != nil {
r.Service.Meta[ConsulK8SNodeName] = *endpoint.NodeName
}
if endpoint.Zone != nil {
r.Service.Meta[ConsulK8STopologyZone] = *endpoint.Zone
}
r.Check = &consulapi.AgentCheck{
CheckID: consulHealthCheckID(endpointSlice.Namespace, serviceID(r.Service.Service, addr)),
Name: consulKubernetesCheckName,
Namespace: baseService.Namespace,
Type: consulKubernetesCheckType,
Status: consulapi.HealthPassing,
ServiceID: serviceID(r.Service.Service, addr),
Output: kubernetesSuccessReasonMsg,
}

r.Check = &consulapi.AgentCheck{
CheckID: consulHealthCheckID(endpointSlice.Namespace, serviceID(r.Service.Service, addr)),
Name: consulKubernetesCheckName,
Namespace: baseService.Namespace,
Type: consulKubernetesCheckType,
Status: consulapi.HealthPassing,
ServiceID: serviceID(r.Service.Service, addr),
Output: kubernetesSuccessReasonMsg,
t.consulMap[key] = append(t.consulMap[key], &r)
}

t.consulMap[key] = append(t.consulMap[key], &r)
}
}
}
Expand Down
15 changes: 13 additions & 2 deletions control-plane/catalog/to-consul/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2141,7 +2141,7 @@ func createEndpointSlice(t *testing.T, client *fake.Clientset, serviceName strin
{
Addresses: []string{"1.1.1.1"},
Conditions: discoveryv1.EndpointConditions{
Ready: pointer.Bool(true),
Ready: nil,
Serving: pointer.Bool(true),
Terminating: pointer.Bool(false),
},
Expand All @@ -2154,11 +2154,22 @@ func createEndpointSlice(t *testing.T, client *fake.Clientset, serviceName strin
Conditions: discoveryv1.EndpointConditions{
Ready: pointer.Bool(true),
Serving: pointer.Bool(true),
Terminating: pointer.Bool(false),
Terminating: nil,
},
NodeName: &node2,
Zone: pointer.String("us-west-2b"),
},
{
Addresses: []string{"3.3.3.3"},
Conditions: discoveryv1.EndpointConditions{
Ready: pointer.Bool(false),
Serving: pointer.Bool(false),
Terminating: pointer.Bool(true),
},
TargetRef: &targetRef,
NodeName: &node1,
Zone: pointer.String("us-west-2a"),
},
},
Ports: []discoveryv1.EndpointPort{
{
Expand Down
Loading