diff --git a/pkg/backend/endpoint_resolver.go b/pkg/backend/endpoint_resolver.go index 55d7a50f6..8c8863090 100644 --- a/pkg/backend/endpoint_resolver.go +++ b/pkg/backend/endpoint_resolver.go @@ -3,6 +3,7 @@ package backend import ( "context" "fmt" + "net" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/go-logr/logr" @@ -27,12 +28,17 @@ var ErrNotFound = errors.New("backend not found") type EndpointResolver interface { // ResolvePodEndpoints will resolve endpoints backed by pods directly. // returns resolved podEndpoints and whether there are unready endpoints that can potentially turn ready in future reconciles. - ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString, - opts ...EndpointResolveOption) ([]PodEndpoint, bool, error) + ResolvePodEndpoints(ctx context.Context, svckey types.NamespacedName, svc *corev1.Service, port intstr.IntOrString, opts ...EndpointResolveOption) ([]IpEndpoint, bool, error) // ResolveNodePortEndpoints will resolve endpoints backed by nodePort. ResolveNodePortEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString, opts ...EndpointResolveOption) ([]NodePortEndpoint, error) + + // FindService finds a k8s service + FindService(ctx context.Context, svcKey types.NamespacedName) (*corev1.Service, error) + + // ResolveExternalNameEndpoints will resolve external name using dns + ResolveExternalNameEndpoints(ctx context.Context, svc *corev1.Service, port intstr.IntOrString) ([]IpEndpoint, error) } // NewDefaultEndpointResolver constructs new defaultEndpointResolver @@ -42,6 +48,7 @@ func NewDefaultEndpointResolver(k8sClient client.Client, podInfoRepo k8s.PodInfo podInfoRepo: podInfoRepo, failOpenEnabled: failOpenEnabled, endpointSliceEnabled: endpointSliceEnabled, + dnsResolver: net.DefaultResolver, logger: logger, } } @@ -58,13 +65,34 @@ type defaultEndpointResolver struct { // [Pod Endpoint] whether to use endpointSlice instead of endpoints endpointSliceEnabled bool logger logr.Logger + // dnsResolver to use for resolving external names + dnsResolver dnsResolver +} + +type dnsResolver interface { + LookupHost(ctx context.Context, host string) (addrs []string, err error) +} + +func (r *defaultEndpointResolver) ResolveExternalNameEndpoints(ctx context.Context, svc *corev1.Service, port intstr.IntOrString) ([]IpEndpoint, error) { + if port.Type == intstr.String { + return nil, fmt.Errorf("port of target group must be numeric for external name") + } + addrs, err := r.dnsResolver.LookupHost(ctx, svc.Spec.ExternalName) + if err != nil { + return nil, err + } + endpoints := make([]IpEndpoint, len(addrs)) + for i, ip := range addrs { + endpoints[i] = IpEndpoint{IP: ip, Port: port.IntVal} + } + return endpoints, nil } -func (r *defaultEndpointResolver) ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString, opts ...EndpointResolveOption) ([]PodEndpoint, bool, error) { +func (r *defaultEndpointResolver) ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, svc *corev1.Service, port intstr.IntOrString, opts ...EndpointResolveOption) ([]IpEndpoint, bool, error) { resolveOpts := defaultEndpointResolveOptions() resolveOpts.ApplyOptions(opts) - _, svcPort, err := r.findServiceAndServicePort(ctx, svcKey, port) + _, svcPort, err := r.findServicePort(svc, port) if err != nil { return nil, false, err } @@ -140,9 +168,9 @@ func (r *defaultEndpointResolver) computeServiceEndpointsData(ctx context.Contex return endpointsDataList, nil } -func (r *defaultEndpointResolver) resolvePodEndpointsWithEndpointsData(ctx context.Context, svcKey types.NamespacedName, svcPort corev1.ServicePort, endpointsDataList []EndpointsData, podReadinessGates []corev1.PodConditionType) ([]PodEndpoint, bool, error) { - var readyPodEndpoints []PodEndpoint - var unknownPodEndpoints []PodEndpoint +func (r *defaultEndpointResolver) resolvePodEndpointsWithEndpointsData(ctx context.Context, svcKey types.NamespacedName, svcPort corev1.ServicePort, endpointsDataList []EndpointsData, podReadinessGates []corev1.PodConditionType) ([]IpEndpoint, bool, error) { + var readyPodEndpoints []IpEndpoint + var unknownPodEndpoints []IpEndpoint containsPotentialReadyEndpoints := false for _, epsData := range endpointsDataList { @@ -175,7 +203,7 @@ func (r *defaultEndpointResolver) resolvePodEndpointsWithEndpointsData(ctx conte continue } - podEndpoint := buildPodEndpoint(pod, epAddr, epPort) + podEndpoint := buildPodEndpoint(&pod, epAddr, epPort) // Recommendation from Kubernetes is to consider unknown ready status as ready (ready == nil) if ep.Conditions.Ready == nil || *ep.Conditions.Ready { readyPodEndpoints = append(readyPodEndpoints, podEndpoint) @@ -220,13 +248,14 @@ func (r *defaultEndpointResolver) resolvePodEndpointsWithEndpointsData(ctx conte } func (r *defaultEndpointResolver) findServiceAndServicePort(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString) (*corev1.Service, corev1.ServicePort, error) { - svc := &corev1.Service{} - if err := r.k8sClient.Get(ctx, svcKey, svc); err != nil { - if apierrors.IsNotFound(err) { - return nil, corev1.ServicePort{}, fmt.Errorf("%w: %v", ErrNotFound, err.Error()) - } + svc, err := r.FindService(ctx, svcKey) + if err != nil { return nil, corev1.ServicePort{}, err } + return r.findServicePort(svc, port) +} + +func (r *defaultEndpointResolver) findServicePort(svc *corev1.Service, port intstr.IntOrString) (*corev1.Service, corev1.ServicePort, error) { svcPort, err := k8s.LookupServicePort(svc, port) if err != nil { return nil, corev1.ServicePort{}, fmt.Errorf("%w: %v", ErrNotFound, err.Error()) @@ -235,6 +264,17 @@ func (r *defaultEndpointResolver) findServiceAndServicePort(ctx context.Context, return svc, svcPort, nil } +func (r *defaultEndpointResolver) FindService(ctx context.Context, svcKey types.NamespacedName) (*corev1.Service, error) { + svc := &corev1.Service{} + if err := r.k8sClient.Get(ctx, svcKey, svc); err != nil { + if apierrors.IsNotFound(err) { + return nil, fmt.Errorf("%w: %v", ErrNotFound, err.Error()) + } + return nil, err + } + return svc, nil +} + // filterNodesByReadyConditionStatus will filter out nodes that matches specified ready condition status func filterNodesByReadyConditionStatus(nodes []*corev1.Node, readyCondStatus corev1.ConditionStatus) []*corev1.Node { var nodesWithMatchingReadyStatus []*corev1.Node @@ -287,8 +327,8 @@ func buildEndpointsDataFromEndpointSliceList(epsList *discovery.EndpointSliceLis return endpointsDataList } -func buildPodEndpoint(pod k8s.PodInfo, epAddr string, port int32) PodEndpoint { - return PodEndpoint{ +func buildPodEndpoint(pod *k8s.PodInfo, epAddr string, port int32) IpEndpoint { + return IpEndpoint{ IP: epAddr, Port: port, Pod: pod, diff --git a/pkg/backend/endpoint_resolver_test.go b/pkg/backend/endpoint_resolver_test.go index 815fbe9c1..5e6db0409 100644 --- a/pkg/backend/endpoint_resolver_test.go +++ b/pkg/backend/endpoint_resolver_test.go @@ -689,7 +689,6 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { } type env struct { nodes []*corev1.Node - services []*corev1.Service endpointsList []*corev1.Endpoints endpointSlices []*discovery.EndpointSlice } @@ -699,16 +698,16 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { endpointSliceEnabled bool } type args struct { - svcKey types.NamespacedName - port intstr.IntOrString - opts []EndpointResolveOption + svc *corev1.Service + port intstr.IntOrString + opts []EndpointResolveOption } tests := []struct { name string env env fields fields args args - want []PodEndpoint + want []IpEndpoint wantContainsPotentialReadyEndpoints bool wantErr error }{ @@ -716,7 +715,6 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { name: "[with endpoints][with failOpen] choose every ready pod only when there are ready pods", env: env{ nodes: []*corev1.Node{nodeA, nodeB, nodeC}, - services: []*corev1.Service{svc1}, endpointsList: []*corev1.Endpoints{ep1}, }, fields: fields{ @@ -761,20 +759,20 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { }, }, args: args{ - svcKey: k8s.NamespacedName(svc1), - port: intstr.FromString("http"), - opts: nil, + svc: svc1, + port: intstr.FromString("http"), + opts: nil, }, - want: []PodEndpoint{ + want: []IpEndpoint{ { IP: "192.168.1.1", Port: 8080, - Pod: pod1, + Pod: &pod1, }, { IP: "192.168.1.4", Port: 8080, - Pod: pod4, + Pod: &pod4, }, }, wantContainsPotentialReadyEndpoints: false, @@ -783,7 +781,6 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { name: "[with endpointSlices][with failOpen] choose every ready pod only when there are ready pods", env: env{ nodes: []*corev1.Node{nodeA, nodeB, nodeC}, - services: []*corev1.Service{svc1}, endpointSlices: []*discovery.EndpointSlice{eps1}, }, fields: fields{ @@ -833,20 +830,20 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { }, }, args: args{ - svcKey: k8s.NamespacedName(svc1), - port: intstr.FromString("http"), - opts: nil, + svc: svc1, + port: intstr.FromString("http"), + opts: nil, }, - want: []PodEndpoint{ + want: []IpEndpoint{ { IP: "192.168.1.1", Port: 8080, - Pod: pod1, + Pod: &pod1, }, { IP: "192.168.1.4", Port: 8080, - Pod: pod4, + Pod: &pod4, }, }, wantContainsPotentialReadyEndpoints: false, @@ -855,7 +852,6 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { name: "[with endpointSlices][without failOpen] choose every ready pod only when there are ready pods", env: env{ nodes: []*corev1.Node{nodeA, nodeB, nodeC}, - services: []*corev1.Service{svc1}, endpointSlices: []*discovery.EndpointSlice{eps1}, }, fields: fields{ @@ -905,20 +901,20 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { }, }, args: args{ - svcKey: k8s.NamespacedName(svc1), - port: intstr.FromString("http"), - opts: nil, + svc: svc1, + port: intstr.FromString("http"), + opts: nil, }, - want: []PodEndpoint{ + want: []IpEndpoint{ { IP: "192.168.1.1", Port: 8080, - Pod: pod1, + Pod: &pod1, }, { IP: "192.168.1.4", Port: 8080, - Pod: pod4, + Pod: &pod4, }, }, wantContainsPotentialReadyEndpoints: false, @@ -927,7 +923,6 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { name: "[with endpointSlices][with failOpen] choose every unknown pod when there are no ready pods", env: env{ nodes: []*corev1.Node{nodeA, nodeB, nodeC}, - services: []*corev1.Service{svc1}, endpointSlices: []*discovery.EndpointSlice{eps2}, }, fields: fields{ @@ -967,25 +962,25 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { }, }, args: args{ - svcKey: k8s.NamespacedName(svc1), - port: intstr.FromString("http"), - opts: nil, + svc: svc1, + port: intstr.FromString("http"), + opts: nil, }, - want: []PodEndpoint{ + want: []IpEndpoint{ { IP: "192.168.1.2", Port: 8080, - Pod: pod2, + Pod: &pod2, }, { IP: "192.168.1.5", Port: 8080, - Pod: pod5, + Pod: &pod5, }, { IP: "192.168.1.8", Port: 8080, - Pod: pod8, + Pod: &pod8, }, }, wantContainsPotentialReadyEndpoints: false, @@ -994,7 +989,6 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { name: "[with endpointSlices][without failOpen] don't choose unknown pod when there are no ready pods", env: env{ nodes: []*corev1.Node{nodeA, nodeB, nodeC}, - services: []*corev1.Service{svc1}, endpointSlices: []*discovery.EndpointSlice{eps2}, }, fields: fields{ @@ -1034,9 +1028,9 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { }, }, args: args{ - svcKey: k8s.NamespacedName(svc1), - port: intstr.FromString("http"), - opts: nil, + svc: svc1, + port: intstr.FromString("http"), + opts: nil, }, want: nil, wantContainsPotentialReadyEndpoints: false, @@ -1045,7 +1039,6 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { name: "[with endpointSlices][with failOpen] choose every ready pod only when there are ready pods - some pod have readinessGate", env: env{ nodes: []*corev1.Node{nodeA, nodeB, nodeC}, - services: []*corev1.Service{svc1}, endpointSlices: []*discovery.EndpointSlice{eps1}, }, fields: fields{ @@ -1095,20 +1088,20 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { }, }, args: args{ - svcKey: k8s.NamespacedName(svc1), - port: intstr.FromString("http"), - opts: []EndpointResolveOption{WithPodReadinessGate("custom-condition")}, + svc: svc1, + port: intstr.FromString("http"), + opts: []EndpointResolveOption{WithPodReadinessGate("custom-condition")}, }, - want: []PodEndpoint{ + want: []IpEndpoint{ { IP: "192.168.1.1", Port: 8080, - Pod: pod1, + Pod: &pod1, }, { IP: "192.168.1.4", Port: 8080, - Pod: pod4, + Pod: &pod4, }, }, wantContainsPotentialReadyEndpoints: true, @@ -1117,7 +1110,6 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { name: "[with endpointSlices][with failOpen] choose every ready pod only when there are ready pods - no pod have readinessGate", env: env{ nodes: []*corev1.Node{nodeA, nodeB, nodeC}, - services: []*corev1.Service{svc1}, endpointSlices: []*discovery.EndpointSlice{eps3}, }, fields: fields{ @@ -1162,20 +1154,20 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { }, }, args: args{ - svcKey: k8s.NamespacedName(svc1), - port: intstr.FromString("http"), - opts: []EndpointResolveOption{WithPodReadinessGate("custom-condition")}, + svc: svc1, + port: intstr.FromString("http"), + opts: []EndpointResolveOption{WithPodReadinessGate("custom-condition")}, }, - want: []PodEndpoint{ + want: []IpEndpoint{ { IP: "192.168.1.1", Port: 8080, - Pod: pod1, + Pod: &pod1, }, { IP: "192.168.1.4", Port: 8080, - Pod: pod4, + Pod: &pod4, }, }, wantContainsPotentialReadyEndpoints: false, @@ -1184,7 +1176,6 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { name: "[with endpoints][with failOpen] choose every ready pod only when there are ready pods - ignore pods don't exists", env: env{ nodes: []*corev1.Node{nodeA, nodeB, nodeC}, - services: []*corev1.Service{svc1}, endpointsList: []*corev1.Endpoints{ep1}, }, fields: fields{ @@ -1229,66 +1220,46 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { }, }, args: args{ - svcKey: k8s.NamespacedName(svc1), - port: intstr.FromString("http"), - opts: nil, + svc: svc1, + port: intstr.FromString("http"), + opts: nil, }, - want: []PodEndpoint{ + want: []IpEndpoint{ { IP: "192.168.1.4", Port: 8080, - Pod: pod4, + Pod: &pod4, }, }, wantContainsPotentialReadyEndpoints: true, }, - { - name: "service not found", - env: env{ - services: []*corev1.Service{}, - endpointsList: []*corev1.Endpoints{}, - }, - fields: fields{ - podInfoRepoGetCalls: []podInfoRepoGetCall{}, - }, - args: args{ - svcKey: k8s.NamespacedName(svc1), - port: intstr.FromString("http"), - opts: nil, - }, - want: []PodEndpoint{}, - wantContainsPotentialReadyEndpoints: false, - wantErr: fmt.Errorf("%w: %v", ErrNotFound, "services \"svc-1\" not found"), - }, { name: "service port not found", env: env{ - services: []*corev1.Service{svc1WithoutHTTPPort}, endpointsList: []*corev1.Endpoints{}, }, fields: fields{ podInfoRepoGetCalls: []podInfoRepoGetCall{}, }, args: args{ - svcKey: k8s.NamespacedName(svc1), - port: intstr.FromString("http"), - opts: nil, + svc: svc1WithoutHTTPPort, + port: intstr.FromString("http"), + opts: nil, }, wantErr: fmt.Errorf("%w: %v", ErrNotFound, "unable to find port http on service test-ns/svc-1"), }, { name: "endpoints not found", env: env{ - services: []*corev1.Service{svc1}, endpointsList: []*corev1.Endpoints{}, }, fields: fields{ podInfoRepoGetCalls: []podInfoRepoGetCall{}, }, args: args{ - svcKey: k8s.NamespacedName(svc1), - port: intstr.FromString("http"), - opts: nil, + svc: svc1, + port: intstr.FromString("http"), + opts: nil, }, wantErr: fmt.Errorf("%w: %v", ErrNotFound, "endpoints \"svc-1\" not found"), }, @@ -1311,9 +1282,6 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { for _, node := range tt.env.nodes { assert.NoError(t, k8sClient.Create(ctx, node.DeepCopy())) } - for _, svc := range tt.env.services { - assert.NoError(t, k8sClient.Create(ctx, svc.DeepCopy())) - } for _, endpoints := range tt.env.endpointsList { assert.NoError(t, k8sClient.Create(ctx, endpoints.DeepCopy())) } @@ -1328,14 +1296,14 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { endpointSliceEnabled: tt.fields.endpointSliceEnabled, logger: logr.New(&log.NullLogSink{}), } - got, gotContainsPotentialReadyEndpoints, err := r.ResolvePodEndpoints(ctx, tt.args.svcKey, tt.args.port, tt.args.opts...) + got, gotContainsPotentialReadyEndpoints, err := r.ResolvePodEndpoints(ctx, k8s.NamespacedName(tt.args.svc), tt.args.svc, tt.args.port, tt.args.opts...) if tt.wantErr != nil { assert.EqualError(t, err, tt.wantErr.Error()) } else { assert.NoError(t, err) opt := cmp.Options{ equality.IgnoreFakeClientPopulatedFields(), - cmpopts.SortSlices(func(lhs PodEndpoint, rhs PodEndpoint) bool { + cmpopts.SortSlices(func(lhs IpEndpoint, rhs IpEndpoint) bool { return lhs.IP < rhs.IP }), } @@ -1347,6 +1315,61 @@ func Test_defaultEndpointResolver_ResolvePodEndpoints(t *testing.T) { } } +type mockDns map[string][]string + +func (m mockDns) LookupHost(_ context.Context, host string) (addrs []string, err error) { + var ok bool + addrs, ok = m[host] + if !ok { + err = fmt.Errorf("No ips are registered for %s", host) + } + return +} + +func Test_defaultEndpointResolver_ResolveExternalNameEndpoints(t *testing.T) { + svcExternalName := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "testNS", + Name: "svc-1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeExternalName, + ExternalName: "service.in.my.vpc", + }, + } + ctx := context.Background() + + want := []IpEndpoint{ + { + IP: "10.10.10.10", + Port: 80, + }, + { + IP: "10.10.11.10", + Port: 80, + }, + } + opt := cmp.Options{ + equality.IgnoreFakeClientPopulatedFields(), + cmpopts.SortSlices(func(lhs IpEndpoint, rhs IpEndpoint) bool { + return lhs.IP < rhs.IP + }), + } + + r := &defaultEndpointResolver{ + logger: logr.New(&log.NullLogSink{}), + dnsResolver: mockDns{"service.in.my.vpc": []string{"10.10.10.10", "10.10.11.10"}}, + } + got, err := r.ResolveExternalNameEndpoints(ctx, svcExternalName, intstr.FromInt(80)) + assert.NoError(t, err) + assert.True(t, cmp.Equal(want, got, opt), + "diff: %v", cmp.Diff(want, got, opt)) + + r.dnsResolver = mockDns{} + got, err = r.ResolveExternalNameEndpoints(ctx, svcExternalName, intstr.FromInt(80)) + assert.EqualError(t, err, fmt.Errorf("No ips are registered for %s", svcExternalName.Spec.ExternalName).Error()) +} + func Test_defaultEndpointResolver_ResolveNodePortEndpoints(t *testing.T) { testNS := "test-ns" node1 := &corev1.Node{ @@ -2079,7 +2102,7 @@ func Test_defaultEndpointResolver_findServiceAndServicePort(t *testing.T) { } else { opt := cmp.Options{ equality.IgnoreFakeClientPopulatedFields(), - cmpopts.SortSlices(func(lhs PodEndpoint, rhs PodEndpoint) bool { + cmpopts.SortSlices(func(lhs IpEndpoint, rhs IpEndpoint) bool { return lhs.IP < rhs.IP }), } @@ -2596,7 +2619,7 @@ func Test_buildPodEndpoint(t *testing.T) { tests := []struct { name string args args - want PodEndpoint + want IpEndpoint }{ { name: "base case", @@ -2607,10 +2630,10 @@ func Test_buildPodEndpoint(t *testing.T) { epAddr: "192.168.1.1", port: 80, }, - want: PodEndpoint{ + want: IpEndpoint{ IP: "192.168.1.1", Port: 80, - Pod: k8s.PodInfo{ + Pod: &k8s.PodInfo{ Key: types.NamespacedName{Name: "sample-node"}, }, }, @@ -2618,7 +2641,7 @@ func Test_buildPodEndpoint(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := buildPodEndpoint(tt.args.pod, tt.args.epAddr, tt.args.port) + got := buildPodEndpoint(&tt.args.pod, tt.args.epAddr, tt.args.port) assert.Equal(t, tt.want, got) }) } diff --git a/pkg/backend/endpoint_types.go b/pkg/backend/endpoint_types.go index f928ca4f0..128dd3a07 100644 --- a/pkg/backend/endpoint_types.go +++ b/pkg/backend/endpoint_types.go @@ -12,18 +12,18 @@ type Endpoint interface { GetIdentifier(includeTimestamp bool) string } -// An endpoint provided by pod directly. -type PodEndpoint struct { +// IpEndpoint is an endpoint for an ip address +type IpEndpoint struct { // Pod's IP. IP string // Pod's container port. Port int32 // Pod that provides this endpoint. - Pod k8s.PodInfo + Pod *k8s.PodInfo } -func (e PodEndpoint) GetIdentifier(includeTimestamp bool) string { - if includeTimestamp { +func (e IpEndpoint) GetIdentifier(includeTimestamp bool) string { + if includeTimestamp && e.Pod != nil { return fmt.Sprintf("%s:%d:%d", e.IP, e.Port, e.Pod.CreationTime.UnixMilli()) } return fmt.Sprintf("%s:%d", e.IP, e.Port) diff --git a/pkg/ingress/model_build_target_group.go b/pkg/ingress/model_build_target_group.go index 381fd3972..5cd03c3b0 100644 --- a/pkg/ingress/model_build_target_group.go +++ b/pkg/ingress/model_build_target_group.go @@ -153,7 +153,7 @@ func (t *defaultModelBuildTask) buildTargetGroupBindingNetworking(ctx context.Co func (t *defaultModelBuildTask) buildTargetGroupSpec(ctx context.Context, ing ClassifiedIngress, svc *corev1.Service, port intstr.IntOrString, svcPort corev1.ServicePort) (elbv2model.TargetGroupSpec, error) { svcAndIngAnnotations := algorithm.MergeStringMap(svc.Annotations, ing.Ing.Annotations) - targetType, err := t.buildTargetGroupTargetType(ctx, svcAndIngAnnotations) + targetType, err := t.buildTargetGroupTargetType(ctx, svcAndIngAnnotations, svc) if err != nil { return elbv2model.TargetGroupSpec{}, err } @@ -220,9 +220,13 @@ func (t *defaultModelBuildTask) buildTargetGroupName(_ context.Context, return fmt.Sprintf("k8s-%.8s-%.8s-%.10s", sanitizedNamespace, sanitizedName, uuid) } -func (t *defaultModelBuildTask) buildTargetGroupTargetType(_ context.Context, svcAndIngAnnotations map[string]string) (elbv2model.TargetType, error) { +func (t *defaultModelBuildTask) buildTargetGroupTargetType(_ context.Context, svcAndIngAnnotations map[string]string, svc *corev1.Service) (elbv2model.TargetType, error) { rawTargetType := string(t.defaultTargetType) _ = t.annotationParser.ParseStringAnnotation(annotations.IngressSuffixTargetType, &rawTargetType, svcAndIngAnnotations) + if svc.Spec.Type == corev1.ServiceTypeExternalName && rawTargetType != string(elbv2model.TargetTypeIP) { + t.logger.Info("Target type will be ip for since service is an ExternalName", "service", k8s.NamespacedName(svc)) + rawTargetType = string(elbv2model.TargetTypeIP) + } switch rawTargetType { case string(elbv2model.TargetTypeInstance): return elbv2model.TargetTypeInstance, nil diff --git a/pkg/targetgroupbinding/multicluster_manager.go b/pkg/targetgroupbinding/multicluster_manager.go index 3743feb40..39e13998a 100644 --- a/pkg/targetgroupbinding/multicluster_manager.go +++ b/pkg/targetgroupbinding/multicluster_manager.go @@ -25,7 +25,7 @@ type MultiClusterManager interface { FilterTargetsForDeregistration(ctx context.Context, tgb *elbv2api.TargetGroupBinding, targetInfo []TargetInfo) ([]TargetInfo, bool, error) // UpdateTrackedIPTargets Update the tracked target set in persistent storage - UpdateTrackedIPTargets(ctx context.Context, updateRequested bool, endpoints []backend.PodEndpoint, tgb *elbv2api.TargetGroupBinding) error + UpdateTrackedIPTargets(ctx context.Context, updateRequested bool, endpoints []backend.IpEndpoint, tgb *elbv2api.TargetGroupBinding) error // UpdateTrackedInstanceTargets Update the tracked target set in persistent storage UpdateTrackedInstanceTargets(ctx context.Context, updateRequested bool, endpoints []backend.NodePortEndpoint, tgb *elbv2api.TargetGroupBinding) error @@ -54,7 +54,7 @@ func NewMultiClusterManager(kubeClient client.Client, apiReader client.Reader, l } } -func (m *multiClusterManagerImpl) UpdateTrackedIPTargets(ctx context.Context, updateRequested bool, endpoints []backend.PodEndpoint, tgb *elbv2api.TargetGroupBinding) error { +func (m *multiClusterManagerImpl) UpdateTrackedIPTargets(ctx context.Context, updateRequested bool, endpoints []backend.IpEndpoint, tgb *elbv2api.TargetGroupBinding) error { endpointStringFn := func() []string { endpointStrings := make([]string, 0, len(endpoints)) diff --git a/pkg/targetgroupbinding/multicluster_manager_test.go b/pkg/targetgroupbinding/multicluster_manager_test.go index 4c77c002e..a6729e3af 100644 --- a/pkg/targetgroupbinding/multicluster_manager_test.go +++ b/pkg/targetgroupbinding/multicluster_manager_test.go @@ -255,7 +255,7 @@ func TestUpdateTrackedIPTargets(t *testing.T) { }, } - endpoints := []backend.PodEndpoint{ + endpoints := []backend.IpEndpoint{ { IP: "127.0.0.1", Port: 80, diff --git a/pkg/targetgroupbinding/networking_manager.go b/pkg/targetgroupbinding/networking_manager.go index 21fa4b741..202d93b05 100644 --- a/pkg/targetgroupbinding/networking_manager.go +++ b/pkg/targetgroupbinding/networking_manager.go @@ -36,7 +36,7 @@ const ( // NetworkingManager manages the networking for targetGroupBindings. type NetworkingManager interface { // ReconcileForPodEndpoints reconcile network settings for TargetGroupBindings with podEndpoints. - ReconcileForPodEndpoints(ctx context.Context, tgb *elbv2api.TargetGroupBinding, endpoints []backend.PodEndpoint) error + ReconcileForPodEndpoints(ctx context.Context, tgb *elbv2api.TargetGroupBinding, endpoints []backend.IpEndpoint) error // ReconcileForNodePortEndpoints reconcile network settings for TargetGroupBindings with nodePortEndpoints. ReconcileForNodePortEndpoints(ctx context.Context, tgb *elbv2api.TargetGroupBinding, endpoints []backend.NodePortEndpoint) error @@ -96,7 +96,7 @@ type defaultNetworkingManager struct { disableRestrictedSGRules bool } -func (m *defaultNetworkingManager) ReconcileForPodEndpoints(ctx context.Context, tgb *elbv2api.TargetGroupBinding, endpoints []backend.PodEndpoint) error { +func (m *defaultNetworkingManager) ReconcileForPodEndpoints(ctx context.Context, tgb *elbv2api.TargetGroupBinding, endpoints []backend.IpEndpoint) error { var ingressPermissionsPerSG map[string][]networking.IPPermissionInfo if tgb.Spec.Networking != nil { var err error @@ -124,12 +124,12 @@ func (m *defaultNetworkingManager) Cleanup(ctx context.Context, tgb *elbv2api.Ta return m.reconcileWithIngressPermissionsPerSG(ctx, tgb, nil) } -func (m *defaultNetworkingManager) computeIngressPermissionsPerSGWithPodEndpoints(ctx context.Context, tgbNetworking elbv2api.TargetGroupBindingNetworking, endpoints []backend.PodEndpoint) (map[string][]networking.IPPermissionInfo, error) { +func (m *defaultNetworkingManager) computeIngressPermissionsPerSGWithPodEndpoints(ctx context.Context, tgbNetworking elbv2api.TargetGroupBindingNetworking, endpoints []backend.IpEndpoint) (map[string][]networking.IPPermissionInfo, error) { pods := make([]k8s.PodInfo, 0, len(endpoints)) podByPodKey := make(map[types.NamespacedName]k8s.PodInfo, len(endpoints)) for _, endpoint := range endpoints { - pods = append(pods, endpoint.Pod) - podByPodKey[endpoint.Pod.Key] = endpoint.Pod + pods = append(pods, *endpoint.Pod) + podByPodKey[endpoint.Pod.Key] = *endpoint.Pod } eniInfoByPodKey, err := m.podENIResolver.Resolve(ctx, pods) if err != nil { diff --git a/pkg/targetgroupbinding/resource_manager.go b/pkg/targetgroupbinding/resource_manager.go index af25a824f..63984cc04 100644 --- a/pkg/targetgroupbinding/resource_manager.go +++ b/pkg/targetgroupbinding/resource_manager.go @@ -141,33 +141,44 @@ func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, backend.WithPodReadinessGate(targetHealthCondType), } - var endpoints []backend.PodEndpoint + var endpoints []backend.IpEndpoint var containsPotentialReadyEndpoints bool - var err error - - endpoints, containsPotentialReadyEndpoints, err = m.endpointResolver.ResolvePodEndpoints(ctx, svcKey, tgb.Spec.ServiceRef.Port, resolveOpts...) - + svc, err := m.endpointResolver.FindService(ctx, svcKey) if err != nil { - if errors.Is(err, backend.ErrNotFound) { - m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonBackendNotFound, err.Error()) - return "", "", false, m.Cleanup(ctx, tgb) - } return "", "", false, err } + oldCheckPoint := "" + newCheckPoint := "" + if svc.Spec.Type == corev1.ServiceTypeExternalName { + endpoints, err = m.endpointResolver.ResolveExternalNameEndpoints(ctx, svc, tgb.Spec.ServiceRef.Port) + if err != nil { + return "", "", false, err + } + } else { - newCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb) + endpoints, containsPotentialReadyEndpoints, err = m.endpointResolver.ResolvePodEndpoints(ctx, svcKey, svc, tgb.Spec.ServiceRef.Port, resolveOpts...) - if err != nil { - return "", "", false, err - } + if err != nil { + if errors.Is(err, backend.ErrNotFound) { + m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonBackendNotFound, err.Error()) + return "", "", false, m.Cleanup(ctx, tgb) + } + return "", "", false, err + } - oldCheckPoint := GetTGBReconcileCheckpoint(tgb) + newCheckPoint, err = calculateTGBReconcileCheckpoint(endpoints, tgb) - if !containsPotentialReadyEndpoints && oldCheckPoint == newCheckPoint { - tgbScopedLogger.Info("Skipping targetgroupbinding reconcile", "calculated hash", newCheckPoint) - return newCheckPoint, oldCheckPoint, true, nil - } + if err != nil { + return "", "", false, err + } + + oldCheckPoint = GetTGBReconcileCheckpoint(tgb) + if !containsPotentialReadyEndpoints && oldCheckPoint == newCheckPoint { + tgbScopedLogger.Info("Skipping targetgroupbinding reconcile", "calculated hash", newCheckPoint) + return newCheckPoint, oldCheckPoint, true, nil + } + } tgARN := tgb.Spec.TargetGroupARN vpcID := tgb.Spec.VpcID targets, err := m.targetsManager.ListTargets(ctx, tgARN) @@ -179,33 +190,35 @@ func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, matchedEndpointAndTargets, unmatchedEndpoints, unmatchedTargets := matchPodEndpointWithTargets(endpoints, notDrainingTargets) needNetworkingRequeue := false - if err := m.networkingManager.ReconcileForPodEndpoints(ctx, tgb, endpoints); err != nil { - tgbScopedLogger.Error(err, "Requesting network requeue due to error from ReconcileForPodEndpoints") - m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedNetworkReconcile, err.Error()) - needNetworkingRequeue = true - } + if svc.Spec.Type != corev1.ServiceTypeExternalName { + if err := m.networkingManager.ReconcileForPodEndpoints(ctx, tgb, endpoints); err != nil { + tgbScopedLogger.Error(err, "Requesting network requeue due to error from ReconcileForPodEndpoints") + m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedNetworkReconcile, err.Error()) + needNetworkingRequeue = true + } - preflightNeedFurtherProbe := false - for _, endpointAndTarget := range matchedEndpointAndTargets { - _, localPreflight := m.calculateReadinessGateTransition(endpointAndTarget.endpoint.Pod, targetHealthCondType, endpointAndTarget.target.TargetHealth) - if localPreflight { - preflightNeedFurtherProbe = true - break + preflightNeedFurtherProbe := false + for _, endpointAndTarget := range matchedEndpointAndTargets { + _, localPreflight := m.calculateReadinessGateTransition(*endpointAndTarget.endpoint.Pod, targetHealthCondType, endpointAndTarget.target.TargetHealth) + if localPreflight { + preflightNeedFurtherProbe = true + break + } } - } - // Any change that we perform should reset the checkpoint. - // TODO - How to make this cleaner? - if len(unmatchedEndpoints) > 0 || len(unmatchedTargets) > 0 || needNetworkingRequeue || containsPotentialReadyEndpoints || preflightNeedFurtherProbe { - // Set to an empty checkpoint, to ensure that no matter what we try to reconcile atleast one more time. - // Consider this ordering of events (without using this method of overriding the checkpoint) - // 1. Register some pod IP, don't update TGB checkpoint. - // 2. Before next invocation of reconcile happens, the pod is removed. - // 3. The next reconcile loop has no knowledge that it needs to deregister the pod ip, therefore it skips deregistering the removed pod ip. - err = m.updateTGBCheckPoint(ctx, tgb, "", oldCheckPoint) - if err != nil { - tgbScopedLogger.Error(err, "Unable to update checkpoint before mutating change") - return "", "", false, err + // Any change that we perform should reset the checkpoint. + // TODO - How to make this cleaner? + if len(unmatchedEndpoints) > 0 || len(unmatchedTargets) > 0 || needNetworkingRequeue || containsPotentialReadyEndpoints || preflightNeedFurtherProbe { + // Set to an empty checkpoint, to ensure that no matter what we try to reconcile atleast one more time. + // Consider this ordering of events (without using this method of overriding the checkpoint) + // 1. Register some pod IP, don't update TGB checkpoint. + // 2. Before next invocation of reconcile happens, the pod is removed. + // 3. The next reconcile loop has no knowledge that it needs to deregister the pod ip, therefore it skips deregistering the removed pod ip. + err = m.updateTGBCheckPoint(ctx, tgb, "", oldCheckPoint) + if err != nil { + tgbScopedLogger.Error(err, "Unable to update checkpoint before mutating change") + return "", "", false, err + } } } @@ -238,24 +251,26 @@ func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, return "", "", false, err } } + if svc.Spec.Type != corev1.ServiceTypeExternalName { - if err := m.multiClusterManager.UpdateTrackedIPTargets(ctx, updateTrackedTargets, endpoints, tgb); err != nil { - return "", "", false, err - } + if err := m.multiClusterManager.UpdateTrackedIPTargets(ctx, updateTrackedTargets, endpoints, tgb); err != nil { + return "", "", false, err + } anyPodNeedFurtherProbe, err := m.updateTargetHealthPodCondition(ctx, targetHealthCondType, matchedEndpointAndTargets, unmatchedEndpoints, tgb) - if err != nil { - return "", "", false, err - } + if err != nil { + return "", "", false, err + } - if anyPodNeedFurtherProbe { - tgbScopedLogger.Info("Requeue for target monitor target health") - return "", "", false, runtime.NewRequeueNeededAfter("monitor targetHealth", m.requeueDuration) - } + if anyPodNeedFurtherProbe { + tgbScopedLogger.Info("Requeue for target monitor target health") + return "", "", false, runtime.NewRequeueNeededAfter("monitor targetHealth", m.requeueDuration) + } - if containsPotentialReadyEndpoints { - tgbScopedLogger.Info("Requeue for potentially ready endpoints") - return "", "", false, runtime.NewRequeueNeededAfter("monitor potential ready endpoints", m.requeueDuration) + if containsPotentialReadyEndpoints { + tgbScopedLogger.Info("Requeue for potentially ready endpoints") + return "", "", false, runtime.NewRequeueNeededAfter("monitor potential ready endpoints", m.requeueDuration) + } } if needNetworkingRequeue { @@ -377,13 +392,13 @@ func (m *defaultResourceManager) cleanupTargets(ctx context.Context, tgb *elbv2a // updateTargetHealthPodCondition will updates pod's targetHealth condition for matchedEndpointAndTargets and unmatchedEndpoints. // returns whether further probe is needed or not func (m *defaultResourceManager) updateTargetHealthPodCondition(ctx context.Context, targetHealthCondType corev1.PodConditionType, - matchedEndpointAndTargets []podEndpointAndTargetPair, unmatchedEndpoints []backend.PodEndpoint, tgb *elbv2api.TargetGroupBinding) (bool, error) { + matchedEndpointAndTargets []podEndpointAndTargetPair, unmatchedEndpoints []backend.IpEndpoint, tgb *elbv2api.TargetGroupBinding) (bool, error) { anyPodNeedFurtherProbe := false for _, endpointAndTarget := range matchedEndpointAndTargets { pod := endpointAndTarget.endpoint.Pod targetHealth := endpointAndTarget.target.TargetHealth - needFurtherProbe, err := m.updateTargetHealthPodConditionForPod(ctx, pod, targetHealth, targetHealthCondType, tgb) + needFurtherProbe, err := m.updateTargetHealthPodConditionForPod(ctx, *pod, targetHealth, targetHealthCondType, tgb) if err != nil { return false, err } @@ -399,7 +414,7 @@ func (m *defaultResourceManager) updateTargetHealthPodCondition(ctx context.Cont Reason: elbv2types.TargetHealthReasonEnumRegistrationInProgress, Description: awssdk.String("Target registration is in progress"), } - needFurtherProbe, err := m.updateTargetHealthPodConditionForPod(ctx, pod, targetHealth, targetHealthCondType, tgb) + needFurtherProbe, err := m.updateTargetHealthPodConditionForPod(ctx, *pod, targetHealth, targetHealthCondType, tgb) if err != nil { return false, err } @@ -544,7 +559,7 @@ func (m *defaultResourceManager) deregisterTargets(ctx context.Context, tgb *elb return true, m.targetsManager.DeregisterTargets(ctx, tgARN, sdkTargets) } -func (m *defaultResourceManager) registerPodEndpoints(ctx context.Context, tgARN, tgVpcID string, endpoints []backend.PodEndpoint) error { +func (m *defaultResourceManager) registerPodEndpoints(ctx context.Context, tgARN, tgVpcID string, endpoints []backend.IpEndpoint) error { vpcID := m.vpcID // Target group is in a different VPC from the cluster's VPC if tgVpcID != "" && tgVpcID != m.vpcID { @@ -608,7 +623,7 @@ func (m *defaultResourceManager) updateTGBCheckPoint(ctx context.Context, tgb *e } type podEndpointAndTargetPair struct { - endpoint backend.PodEndpoint + endpoint backend.IpEndpoint target TargetInfo } @@ -634,12 +649,12 @@ func containsTargetsInInitialState(matchedEndpointAndTargets []podEndpointAndTar return false } -func matchPodEndpointWithTargets(endpoints []backend.PodEndpoint, targets []TargetInfo) ([]podEndpointAndTargetPair, []backend.PodEndpoint, []TargetInfo) { +func matchPodEndpointWithTargets(endpoints []backend.IpEndpoint, targets []TargetInfo) ([]podEndpointAndTargetPair, []backend.IpEndpoint, []TargetInfo) { var matchedEndpointAndTargets []podEndpointAndTargetPair - var unmatchedEndpoints []backend.PodEndpoint + var unmatchedEndpoints []backend.IpEndpoint var unmatchedTargets []TargetInfo - endpointsByUID := make(map[string]backend.PodEndpoint, len(endpoints)) + endpointsByUID := make(map[string]backend.IpEndpoint, len(endpoints)) for _, endpoint := range endpoints { endpointUID := fmt.Sprintf("%v:%v", endpoint.IP, endpoint.Port) endpointsByUID[endpointUID] = endpoint diff --git a/pkg/targetgroupbinding/targets_manager_types_test.go b/pkg/targetgroupbinding/targets_manager_types_test.go index 08583630f..e4f39f8ef 100644 --- a/pkg/targetgroupbinding/targets_manager_types_test.go +++ b/pkg/targetgroupbinding/targets_manager_types_test.go @@ -280,7 +280,7 @@ func TestGetIdentifier(t *testing.T) { }, { name: "ip", - endpoint: backend.PodEndpoint{ + endpoint: backend.IpEndpoint{ IP: "127.0.0.1", Port: 80, },