From 3e6d7f63e34b16cebe1f8373a0de83069123b142 Mon Sep 17 00:00:00 2001 From: andrew Date: Tue, 15 Oct 2024 13:19:40 +0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A8=20fix:=20Port=20IntOrString,=20add?= =?UTF-8?q?=20cni=20detector=20to=20add=20secgroup=20when=20using=20cilium?= =?UTF-8?q?=20native=20routing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/cni_detector/cni_detector.go | 94 ++++++ pkg/cni_detector/cni_detector_test.go | 125 ++++++++ pkg/endpoint_resolver/endpoint_resolver.go | 230 +++++++++++++ .../endpoint_resolver_test.go | 301 ++++++++++++++++++ pkg/ingress/controller/controller.go | 158 +++++---- pkg/vngcloud/vcontainer.go | 1 - pkg/vngcloud/vlb.go | 144 ++++++--- 7 files changed, 935 insertions(+), 118 deletions(-) create mode 100644 pkg/cni_detector/cni_detector.go create mode 100644 pkg/cni_detector/cni_detector_test.go create mode 100644 pkg/endpoint_resolver/endpoint_resolver.go create mode 100644 pkg/endpoint_resolver/endpoint_resolver_test.go diff --git a/pkg/cni_detector/cni_detector.go b/pkg/cni_detector/cni_detector.go new file mode 100644 index 0000000..60c5971 --- /dev/null +++ b/pkg/cni_detector/cni_detector.go @@ -0,0 +1,94 @@ +package cni_detector + +import ( + "context" + + apimetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +// CNIType represents the CNI type detected in the cluster. +type CNIType string + +const ( + CalicoOverlay CNIType = "Calico Overlay" + CiliumOverlay CNIType = "Cilium Overlay" + CiliumNativeRouting CNIType = "Cilium Native Routing" + UnknownCNI CNIType = "Unknown CNI" +) + +// Detector detects the CNI type used in the Kubernetes cluster. +type Detector struct { + kubeClient kubernetes.Interface +} + +// NewDetector creates a new instance of the CNI Detector. +func NewDetector(kubeClient kubernetes.Interface) *Detector { + return &Detector{kubeClient: kubeClient} +} + +// DetectCNIType detects the CNI type in the cluster. +func (d *Detector) DetectCNIType() (CNIType, error) { + // Check for Calico + if d.isCalicoOverlay() { + return CalicoOverlay, nil + } + + // Check for Cilium + if d.isCiliumNativeRouting() { + return CiliumNativeRouting, nil + } + + if d.isCiliumOverlay() { + return CiliumOverlay, nil + } + + return UnknownCNI, nil +} + +// Check if Calico Overlay is running +func (d *Detector) isCalicoOverlay() bool { + calicoNodeDaemonSet, err := d.kubeClient.AppsV1().DaemonSets("kube-system").Get(context.TODO(), "calico-node", apimetav1.GetOptions{}) + if err != nil { + klog.Errorf("Error getting Calico DaemonSet: %v", err) + return false + } + if calicoNodeDaemonSet == nil { + return false + } + return true +} + +// Check if Cilium Overlay is running +func (d *Detector) isCiliumOverlay() bool { + ciliumDaemonSet, err := d.kubeClient.AppsV1().DaemonSets("kube-system").Get(context.TODO(), "cilium", apimetav1.GetOptions{}) + if err != nil { + klog.Errorf("Error getting Cilium DaemonSet: %v", err) + return false + } + if ciliumDaemonSet == nil { + return false + } + return true +} + +// Check if Cilium Native Routing is running +func (d *Detector) isCiliumNativeRouting() bool { + if d.isCiliumOverlay() { + // get cilium-config config map + ciliumConfigMap, err := d.kubeClient.CoreV1().ConfigMaps("kube-system").Get(context.TODO(), "cilium-config", apimetav1.GetOptions{}) + if err != nil { + klog.Errorf("Error getting Cilium ConfigMap: %v", err) + return false + } + if ciliumConfigMap == nil { + return false + } + // check if cilium-config have routing-mode: native + if ciliumConfigMap.Data["routing-mode"] == "native" { + return true + } + } + return false +} diff --git a/pkg/cni_detector/cni_detector_test.go b/pkg/cni_detector/cni_detector_test.go new file mode 100644 index 0000000..84484ec --- /dev/null +++ b/pkg/cni_detector/cni_detector_test.go @@ -0,0 +1,125 @@ +package cni_detector + +import ( + "context" + "testing" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/clientcmd" +) + +func TestDetectCNIType_CalicoOverlay(t *testing.T) { + // Set up a fake Kubernetes client with Calico DaemonSet + client := fake.NewSimpleClientset() + client.AppsV1().DaemonSets("kube-system").Create(context.TODO(), &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "calico-node", + Namespace: "kube-system", + }, + }, metav1.CreateOptions{}) + + detector := &Detector{kubeClient: client} + cniType, err := detector.DetectCNIType() + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if cniType != CalicoOverlay { + t.Errorf("expected CNI type %s, got %s", CalicoOverlay, cniType) + } +} + +func TestDetectCNIType_CiliumOverlay(t *testing.T) { + // Set up a fake Kubernetes client with Cilium DaemonSet + client := fake.NewSimpleClientset() + client.AppsV1().DaemonSets("kube-system").Create(context.TODO(), &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cilium", + Namespace: "kube-system", + }, + }, metav1.CreateOptions{}) + + detector := &Detector{kubeClient: client} + cniType, err := detector.DetectCNIType() + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if cniType != CiliumOverlay { + t.Errorf("expected CNI type %s, got %s", CiliumOverlay, cniType) + } +} + +func TestDetectCNIType_CiliumNativeRouting(t *testing.T) { + // Set up a fake Kubernetes client with Cilium DaemonSet and config map + client := fake.NewSimpleClientset() + + // Create the Cilium DaemonSet + client.AppsV1().DaemonSets("kube-system").Create(context.TODO(), &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cilium", + Namespace: "kube-system", + }, + }, metav1.CreateOptions{}) + + // Create the Cilium ConfigMap with routing-mode: native + client.CoreV1().ConfigMaps("kube-system").Create(context.TODO(), &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cilium-config", + Namespace: "kube-system", + }, + Data: map[string]string{ + "routing-mode": "native", + }, + }, metav1.CreateOptions{}) + + detector := &Detector{kubeClient: client} + cniType, err := detector.DetectCNIType() + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if cniType != CiliumNativeRouting { + t.Errorf("expected CNI type %s, got %s", CiliumNativeRouting, cniType) + } +} + +func TestDetectCNIType_UnknownCNI(t *testing.T) { + // Set up a fake Kubernetes client without any CNI configuration + client := fake.NewSimpleClientset() + + detector := &Detector{kubeClient: client} + cniType, err := detector.DetectCNIType() + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if cniType != UnknownCNI { + t.Errorf("expected CNI type %s, got %s", UnknownCNI, cniType) + } +} + +func TestRealCluster(t *testing.T) { + // configPath := "/home/annd2/Downloads/annd2-clean.txt" + configPath := "/home/annd2/Downloads/cluster-43dd8bab-fd8.txt" + if configPath == "" { + t.Skip("Skipping test; no kubeconfig provided") + } + // init new kubernetes client + + config, err := clientcmd.BuildConfigFromFlags("", configPath) + if err != nil { + t.Fatalf("failed to create Kubernetes client: %v", err) + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + t.Fatalf("failed to create Kubernetes client: %v", err) + } + + detector := NewDetector(clientset) + cniType, err := detector.DetectCNIType() + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + t.Logf("Detected CNI type: %s", cniType) +} diff --git a/pkg/endpoint_resolver/endpoint_resolver.go b/pkg/endpoint_resolver/endpoint_resolver.go new file mode 100644 index 0000000..0d8aee2 --- /dev/null +++ b/pkg/endpoint_resolver/endpoint_resolver.go @@ -0,0 +1,230 @@ +package endpoint_resolver + +import ( + "fmt" + "slices" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + nwv1 "k8s.io/api/networking/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + corelisters "k8s.io/client-go/listers/core/v1" +) + +var ErrNotFound = errors.New("backend not found") +var ErrNodeDoesNotHaveInternalAddress = errors.New("node does not have internal address") + +// An endpoint provided by pod directly. +type EndpointAddress struct { + Name string + IP string + Port int +} + +// EndpointResolver resolves the endpoints for specific service & service Port. +type EndpointResolver interface { + // convert Service Backend to int or string + ServiceBackendToIntOrString(port nwv1.ServiceBackendPort) intstr.IntOrString + // GetListTargetPort returns the list of target ports for the service. + GetListTargetPort(svcKey types.NamespacedName, port intstr.IntOrString) ([]int, error) + ResolvePodEndpoints(svcKey types.NamespacedName, port intstr.IntOrString) ([]EndpointAddress, error) + ResolveNodePortEndpoints(svcKey types.NamespacedName, port intstr.IntOrString, nodes []*corev1.Node) ([]EndpointAddress, error) +} + +// NewDefaultEndpointResolver constructs new defaultEndpointResolver +func NewDefaultEndpointResolver(serviceLister corelisters.ServiceLister, endpointLister corelisters.EndpointsLister) *defaultEndpointResolver { + return &defaultEndpointResolver{ + serviceLister: serviceLister, + endpointLister: endpointLister, + } +} + +var _ EndpointResolver = &defaultEndpointResolver{} + +// default implementation for EndpointResolver +type defaultEndpointResolver struct { + serviceLister corelisters.ServiceLister + endpointLister corelisters.EndpointsLister +} + +func (r *defaultEndpointResolver) ServiceBackendToIntOrString(port nwv1.ServiceBackendPort) intstr.IntOrString { + if port.Name != "" { + return intstr.FromString(port.Name) + } + return intstr.FromInt(int(port.Number)) +} + +func (r *defaultEndpointResolver) GetListTargetPort(svcKey types.NamespacedName, port intstr.IntOrString) ([]int, error) { + svc, svcPort, err := r.findServiceAndServicePort(svcKey, port) + if err != nil { + return nil, err + } + + var ports []int + endpoints, err := r.endpointLister.Endpoints(svc.Namespace).Get(svc.Name) + if err != nil { + return nil, err + } + for _, subset := range endpoints.Subsets { + for _, addr := range subset.Addresses { + if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" { + continue + } + for _, port := range subset.Ports { + if port.Name == svcPort.Name && !slices.Contains(ports, int(port.Port)) { + ports = append(ports, int(port.Port)) + } + } + } + + for _, addr := range subset.NotReadyAddresses { + if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" { + continue + } + for _, port := range subset.Ports { + if port.Name == svcPort.Name && !slices.Contains(ports, int(port.Port)) { + ports = append(ports, int(port.Port)) + } + } + } + } + + return ports, nil +} + +func (r *defaultEndpointResolver) ResolvePodEndpoints(svcKey types.NamespacedName, port intstr.IntOrString) ([]EndpointAddress, error) { + svc, svcPort, err := r.findServiceAndServicePort(svcKey, port) + if err != nil { + return nil, err + } + + endpoints, err := r.endpointLister.Endpoints(svc.Namespace).Get(svc.Name) + if err != nil { + return nil, err + } + + var podEndpoints []EndpointAddress + for _, subset := range endpoints.Subsets { + for _, addr := range subset.Addresses { + if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" { + continue + } + for _, port := range subset.Ports { + if port.Name == svcPort.Name { + podEndpoints = append(podEndpoints, EndpointAddress{ + IP: addr.IP, + Port: int(port.Port), + Name: addr.TargetRef.Name, + }) + } + } + } + + for _, addr := range subset.NotReadyAddresses { + if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" { + continue + } + for _, port := range subset.Ports { + if port.Name == svcPort.Name { + podEndpoints = append(podEndpoints, EndpointAddress{ + IP: addr.IP, + Port: int(port.Port), + Name: addr.TargetRef.Name, + }) + } + } + } + } + + return podEndpoints, nil +} + +func (r *defaultEndpointResolver) ResolveNodePortEndpoints(svcKey types.NamespacedName, port intstr.IntOrString, nodes []*corev1.Node) ([]EndpointAddress, error) { + svc, svcPort, err := r.findServiceAndServicePort(svcKey, port) + if err != nil { + return nil, err + } + if svc.Spec.Type != corev1.ServiceTypeNodePort && svc.Spec.Type != corev1.ServiceTypeLoadBalancer { + return nil, errors.Errorf("service type must be either 'NodePort' or 'LoadBalancer': %v", svcKey) + } + svcNodePort := svcPort.NodePort + var endpoints []EndpointAddress + for _, node := range nodes { + nodeIP, err := r.getNodeInternalIP(node) + if err != nil { + continue + } + endpoints = append(endpoints, r.buildNodePortEndpoint(nodeIP, node.Name, svcNodePort)) + } + return endpoints, nil +} + +func (r *defaultEndpointResolver) findServiceAndServicePort(svcKey types.NamespacedName, port intstr.IntOrString) (*corev1.Service, corev1.ServicePort, error) { + svc, err := r.serviceLister.Services(svcKey.Namespace).Get(svcKey.Name) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, corev1.ServicePort{}, fmt.Errorf("%w: %v", ErrNotFound, err.Error()) + } + return nil, corev1.ServicePort{}, err + } + + svcPort, err := r.lookupServicePort(svc, port) + if err != nil { + return nil, corev1.ServicePort{}, fmt.Errorf("%w: %v", ErrNotFound, err.Error()) + } + + return svc, svcPort, nil +} + +// lookupServicePort returns the ServicePort structure for specific port on service. +func (r *defaultEndpointResolver) lookupServicePort(svc *corev1.Service, port intstr.IntOrString) (corev1.ServicePort, error) { + if port.Type == intstr.String { + for _, p := range svc.Spec.Ports { + if p.Name == port.StrVal { + return p, nil + } + } + } else { + for _, p := range svc.Spec.Ports { + if p.Port == port.IntVal { + return p, nil + } + } + } + + return corev1.ServicePort{}, errors.Errorf("unable to find port %s on service %s", port.String(), namespacedName(svc)) +} + +func (r *defaultEndpointResolver) buildNodePortEndpoint(IP, instanceID string, nodePort int32) EndpointAddress { + return EndpointAddress{ + Name: instanceID, + Port: int(nodePort), + IP: IP, + } +} + +func (r *defaultEndpointResolver) getNodeInternalIP(node *corev1.Node) (string, error) { + addrs := node.Status.Addresses + if len(addrs) == 0 { + return "", ErrNodeDoesNotHaveInternalAddress + } + + for _, addr := range addrs { + if addr.Type == corev1.NodeInternalIP { + return addr.Address, nil + } + } + + return "", ErrNodeDoesNotHaveInternalAddress +} + +// namespacedName returns the namespaced name for k8s objects +func namespacedName(obj metav1.Object) types.NamespacedName { + return types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + } +} diff --git a/pkg/endpoint_resolver/endpoint_resolver_test.go b/pkg/endpoint_resolver/endpoint_resolver_test.go new file mode 100644 index 0000000..fbcd86d --- /dev/null +++ b/pkg/endpoint_resolver/endpoint_resolver_test.go @@ -0,0 +1,301 @@ +package endpoint_resolver + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + corelisters "k8s.io/client-go/listers/core/v1" +) + +type mockServiceLister struct { + services map[types.NamespacedName]*corev1.Service +} + +func (m *mockServiceLister) Services(namespace string) corelisters.ServiceNamespaceLister { + return &mockServiceNamespaceLister{ + services: m.services, + namespace: namespace, + } +} + +func (m *mockServiceLister) List(selector labels.Selector) ([]*corev1.Service, error) { + var result []*corev1.Service + for _, svc := range m.services { + if selector.Matches(labels.Set(svc.Labels)) { + result = append(result, svc) + } + } + return result, nil +} + +type mockServiceNamespaceLister struct { + services map[types.NamespacedName]*corev1.Service + namespace string +} + +func (m *mockServiceNamespaceLister) Get(name string) (*corev1.Service, error) { + svc, exists := m.services[types.NamespacedName{Namespace: m.namespace, Name: name}] + if !exists { + return nil, errors.New("service not found") + } + return svc, nil +} + +func (m *mockServiceNamespaceLister) List(selector labels.Selector) ([]*corev1.Service, error) { + var result []*corev1.Service + for _, svc := range m.services { + if selector.Matches(labels.Set(svc.Labels)) { + result = append(result, svc) + } + } + return result, nil +} + +type mockEndpointsLister struct { + endpoints map[types.NamespacedName]*corev1.Endpoints +} + +func (m *mockEndpointsLister) Endpoints(namespace string) corelisters.EndpointsNamespaceLister { + return &mockEndpointsNamespaceLister{ + endpoints: m.endpoints, + namespace: namespace, + } +} + +func (m *mockEndpointsLister) List(selector labels.Selector) ([]*corev1.Endpoints, error) { + var result []*corev1.Endpoints + for _, ep := range m.endpoints { + if selector.Matches(labels.Set(ep.Labels)) { + result = append(result, ep) + } + } + return result, nil +} + +type mockEndpointsNamespaceLister struct { + endpoints map[types.NamespacedName]*corev1.Endpoints + namespace string +} + +func (m *mockEndpointsNamespaceLister) Get(name string) (*corev1.Endpoints, error) { + ep, exists := m.endpoints[types.NamespacedName{Namespace: m.namespace, Name: name}] + if !exists { + return nil, errors.New("endpoints not found") + } + return ep, nil +} + +func (m *mockEndpointsNamespaceLister) List(selector labels.Selector) ([]*corev1.Endpoints, error) { + var result []*corev1.Endpoints + for _, ep := range m.endpoints { + if selector.Matches(labels.Set(ep.Labels)) { + result = append(result, ep) + } + } + return result, nil +} + +func TestResolvePodEndpoints(t *testing.T) { + // Setup mock services and endpoints + services := map[types.NamespacedName]*corev1.Service{ + {Namespace: "default", Name: "test-service"}: { + TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Service"}, + ObjectMeta: metav1.ObjectMeta{Name: "test-service", Namespace: "default"}, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "http", Port: 80}, + }, + }, + }, + } + endpoints := map[types.NamespacedName]*corev1.Endpoints{ + {Namespace: "default", Name: "test-service"}: { + TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Endpoints"}, + ObjectMeta: metav1.ObjectMeta{Name: "test-service", Namespace: "default"}, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: "10.0.0.1", TargetRef: &corev1.ObjectReference{Name: "pod-1", Kind: "Pod"}}, + }, + Ports: []corev1.EndpointPort{ + {Name: "http", Port: 80}, + }, + }, + }, + }, + } + + // Create resolver + serviceLister := &mockServiceLister{services: services} + endpointsLister := &mockEndpointsLister{endpoints: endpoints} + resolver := NewDefaultEndpointResolver(serviceLister, endpointsLister) + + // Test case + svcKey := types.NamespacedName{Namespace: "default", Name: "test-service"} + port := intstr.FromString("http") + + // Act + endpointsResult, err := resolver.ResolvePodEndpoints(svcKey, port) + + // Assert + assert.NoError(t, err) + assert.Equal(t, 1, len(endpointsResult)) + assert.Equal(t, "10.0.0.1", endpointsResult[0].IP) + assert.Equal(t, 80, endpointsResult[0].Port) +} + +func TestResolveNodePortEndpoints(t *testing.T) { + // Setup mock services + services := map[types.NamespacedName]*corev1.Service{ + {Namespace: "default", Name: "test-service"}: { + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Ports: []corev1.ServicePort{ + {Name: "http", NodePort: 30080}, + }, + }, + }, + } + + // Setup nodes + nodes := []*corev1.Node{ + { + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + {Type: corev1.NodeInternalIP, Address: "192.168.0.1"}, + }, + }, + }, + } + + // Create resolver + serviceLister := &mockServiceLister{services: services} + resolver := NewDefaultEndpointResolver(serviceLister, nil) + + // Test case + svcKey := types.NamespacedName{Namespace: "default", Name: "test-service"} + port := intstr.FromString("http") + + // Act + endpointsResult, err := resolver.ResolveNodePortEndpoints(svcKey, port, nodes) + + // Assert + assert.NoError(t, err) + assert.Equal(t, 1, len(endpointsResult)) + assert.Equal(t, "192.168.0.1", endpointsResult[0].IP) + assert.Equal(t, 30080, endpointsResult[0].Port) +} + +func TestFindServiceAndServicePortNotFound(t *testing.T) { + // Setup empty mock services + serviceLister := &mockServiceLister{services: map[types.NamespacedName]*corev1.Service{}} + resolver := NewDefaultEndpointResolver(serviceLister, nil) + + // Test case + svcKey := types.NamespacedName{Namespace: "default", Name: "missing-service"} + port := intstr.FromString("http") + + // Act + _, err := resolver.ResolvePodEndpoints(svcKey, port) + + // Assert + assert.Error(t, err) +} + +func TestGetNodeInternalIP(t *testing.T) { + resolver := NewDefaultEndpointResolver(nil, nil) + + // Test case 1: Node has internal IP + node := &corev1.Node{ + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + {Type: corev1.NodeInternalIP, Address: "192.168.1.1"}, + }, + }, + } + ip, err := resolver.getNodeInternalIP(node) + assert.NoError(t, err) + assert.Equal(t, "192.168.1.1", ip) + + // Test case 2: Node does not have internal IP + nodeNoIP := &corev1.Node{ + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{}, + }, + } + _, err = resolver.getNodeInternalIP(nodeNoIP) + assert.ErrorIs(t, err, ErrNodeDoesNotHaveInternalAddress) +} + +func TestResolvePodEndpointsAnnd2(t *testing.T) { + // Setup mock services and endpoints + services := map[types.NamespacedName]*corev1.Service{ + {Namespace: "default", Name: "test-service"}: { + TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Service"}, + ObjectMeta: metav1.ObjectMeta{Name: "test-service", Namespace: "default"}, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "http", Port: 80, TargetPort: intstr.FromString("http")}, + }, + }, + }, + } + endpoints := map[types.NamespacedName]*corev1.Endpoints{ + {Namespace: "default", Name: "test-service"}: { + TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Endpoints"}, + ObjectMeta: metav1.ObjectMeta{Name: "test-service", Namespace: "default"}, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: "10.0.0.1", TargetRef: &corev1.ObjectReference{Name: "pod-1", Kind: "Pod"}}, + {IP: "11.0.0.1", TargetRef: &corev1.ObjectReference{Name: "pod-2", Kind: "Pod"}}, + }, + Ports: []corev1.EndpointPort{ + {Name: "http", Port: 80}, + }, + }, + { + NotReadyAddresses: []corev1.EndpointAddress{ + {IP: "10.0.0.2", TargetRef: &corev1.ObjectReference{Name: "pod-3", Kind: "Pod"}}, + {IP: "11.0.0.2", TargetRef: &corev1.ObjectReference{Name: "pod-4", Kind: "Pod"}}, + }, + Ports: []corev1.EndpointPort{ + {Name: "http", Port: 90}, + }, + }, + }, + }, + } + + // Create resolver + serviceLister := &mockServiceLister{services: services} + endpointsLister := &mockEndpointsLister{endpoints: endpoints} + resolver := NewDefaultEndpointResolver(serviceLister, endpointsLister) + + // Test case + svcKey := types.NamespacedName{Namespace: "default", Name: "test-service"} + port := intstr.FromString("http") + + // Act + endpointsResult, err := resolver.ResolvePodEndpoints(svcKey, port) + + // Assert + assert.NoError(t, err) + assert.Equal(t, 4, len(endpointsResult)) + assert.Contains(t, endpointsResult, EndpointAddress{IP: "10.0.0.1", Port: 80, Name: "pod-1"}) + assert.Contains(t, endpointsResult, EndpointAddress{IP: "11.0.0.1", Port: 80, Name: "pod-2"}) + assert.Contains(t, endpointsResult, EndpointAddress{IP: "10.0.0.2", Port: 90, Name: "pod-3"}) + assert.Contains(t, endpointsResult, EndpointAddress{IP: "11.0.0.2", Port: 90, Name: "pod-4"}) + + listTargetPort, err := resolver.GetListTargetPort(svcKey, port) + assert.NoError(t, err) + assert.Equal(t, 2, len(listTargetPort)) + assert.Contains(t, listTargetPort, 80) + assert.Contains(t, listTargetPort, 90) +} diff --git a/pkg/ingress/controller/controller.go b/pkg/ingress/controller/controller.go index 8f838a2..9b7920b 100644 --- a/pkg/ingress/controller/controller.go +++ b/pkg/ingress/controller/controller.go @@ -12,7 +12,9 @@ import ( cuongpigerutils "github.com/cuongpiger/joat/utils" "github.com/sirupsen/logrus" "github.com/vngcloud/cloud-provider-vngcloud/pkg/client" + "github.com/vngcloud/cloud-provider-vngcloud/pkg/cni_detector" "github.com/vngcloud/cloud-provider-vngcloud/pkg/consts" + "github.com/vngcloud/cloud-provider-vngcloud/pkg/endpoint_resolver" "github.com/vngcloud/cloud-provider-vngcloud/pkg/ingress/config" "github.com/vngcloud/cloud-provider-vngcloud/pkg/utils" vErrors "github.com/vngcloud/cloud-provider-vngcloud/pkg/utils/errors" @@ -30,6 +32,7 @@ import ( corev1 "k8s.io/api/core/v1" nwv1 "k8s.io/api/networking/v1" apimetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -101,6 +104,11 @@ type Controller struct { workers map[string]chan bool resourceDependant utils.ResourceDependant + endpointResolver endpoint_resolver.EndpointResolver + cniType cni_detector.CNIType + + // store to delete redundant loadbalancer resources + cacheLoadBalancerBuilder map[string]*Expander } // NewController creates a new VngCloud Ingress controller. @@ -131,21 +139,31 @@ func NewController(conf config.Config) *Controller { config: &conf, kubeClient: kubeClient, - queue: queue, - stopCh: make(chan struct{}), - informer: kubeInformerFactory, - recorder: recorder, - serviceLister: serviceInformer.Lister(), - serviceListerSynced: serviceInformer.Informer().HasSynced, - nodeLister: nodeInformer.Lister(), - nodeListerSynced: nodeInformer.Informer().HasSynced, - knownNodes: []*corev1.Node{}, - trackLBUpdate: utils.NewUpdateTracker(), - numOfUpdatingThread: 0, - queues: make(map[string][]interface{}), - workers: make(map[string]chan bool), - resourceDependant: utils.NewResourceDependant(), + queue: queue, + stopCh: make(chan struct{}), + informer: kubeInformerFactory, + recorder: recorder, + serviceLister: serviceInformer.Lister(), + serviceListerSynced: serviceInformer.Informer().HasSynced, + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + knownNodes: []*corev1.Node{}, + trackLBUpdate: utils.NewUpdateTracker(), + numOfUpdatingThread: 0, + queues: make(map[string][]interface{}), + workers: make(map[string]chan bool), + resourceDependant: utils.NewResourceDependant(), + endpointResolver: nil, + cniType: cni_detector.UnknownCNI, + cacheLoadBalancerBuilder: make(map[string]*Expander), + } + + controller.cniType, err = cni_detector.NewDetector(controller.kubeClient).DetectCNIType() + if err != nil { + klog.Errorf("failed to detect CNI type: %v", err) + controller.cniType = cni_detector.UnknownCNI } + klog.Infof("Detected CNI type: %s", controller.cniType) ingInformer := kubeInformerFactory.Networking().V1().Ingresses() _, err = ingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -276,6 +294,7 @@ func NewController(conf config.Config) *Controller { controller.endpointLister = endpointInformer.Lister() controller.endpointListerSynced = endpointInformer.Informer().HasSynced + controller.endpointResolver = endpoint_resolver.NewDefaultEndpointResolver(controller.serviceLister, controller.endpointLister) return controller } @@ -702,9 +721,14 @@ func (c *Controller) DeleteLoadbalancer(ing *nwv1.Ingress) error { return err } - oldIngExpander, err := c.inspectIngress(ing) - if err != nil { - oldIngExpander, _ = c.inspectIngress(nil) + var oldIngExpander *Expander + var ok bool + if oldIngExpander, ok = c.cacheLoadBalancerBuilder[fmt.Sprintf("%s/%s", ing.Namespace, ing.Name)]; !ok { + // build again + oldIngExpander, err = c.inspectIngress(ing) + if err != nil { + oldIngExpander, _ = c.inspectIngress(nil) + } } newIngExpander, err := c.inspectIngress(nil) if err != nil { @@ -789,6 +813,7 @@ func (c *Controller) DeleteLoadbalancer(ing *nwv1.Ingress) error { klog.Errorln("error when delete lb", err) return err } + delete(c.cacheLoadBalancerBuilder, fmt.Sprintf("%s/%s", ing.Namespace, ing.Name)) return nil } @@ -797,6 +822,7 @@ func (c *Controller) DeleteLoadbalancer(ing *nwv1.Ingress) error { klog.Errorln("error when compare ingress", err) return err } + delete(c.cacheLoadBalancerBuilder, fmt.Sprintf("%s/%s", ing.Namespace, ing.Name)) return nil } @@ -932,7 +958,6 @@ func (c *Controller) inspectIngress(ing *nwv1.Ingress) (*Expander, error) { klog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err) return nil, err } - membersAddr := utils.GetNodeMembersAddr(nodesAfterFilter) // get subnetID of this ingress providerIDs := utils.GetListProviderID(nodesAfterFilter) @@ -979,68 +1004,70 @@ func (c *Controller) inspectIngress(ing *nwv1.Ingress) (*Expander, error) { serviceKey := fmt.Sprintf("%s/%s", ing.ObjectMeta.Namespace, service.Name) poolName := utils.GeneratePoolName(c.getClusterID(), ing.Namespace, ing.Name, consts.RESOURCE_TYPE_INGRESS, serviceKey, int(service.Port.Number)) - targetPort, nodePort, err := utils.GetServiceNodePort(c.serviceLister, serviceKey, service) if err != nil { klog.Errorf("error when get node port: %v", err) return nil, err } - monitorPort := nodePort - if serviceConf.HealthcheckPort != 0 { - monitorPort = serviceConf.HealthcheckPort - if serviceConf.IsAutoCreateSecurityGroup { - ingressInspect.AddSecgroupRule(monitorPort, secgroup_rule.CreateOptsProtocolOptTCP) + members := make([]*pool.Member, 0) + var membersAddr []endpoint_resolver.EndpointAddress + serviceIntOrStrPort := c.endpointResolver.ServiceBackendToIntOrString(service.Port) + + // add security group rule for each target port, cilium native routing need to open these ports + if serviceConf.IsAutoCreateSecurityGroup && c.cniType == cni_detector.CiliumNativeRouting { + // get list target port + serviceTagetPortList, err := c.endpointResolver.GetListTargetPort(types.NamespacedName{Namespace: ing.Namespace, Name: service.Name}, + serviceIntOrStrPort) + if err != nil { + klog.Errorf("error when get list target port: %v", err) + return nil, err + } + for _, targetPort := range serviceTagetPortList { + ingressInspect.AddSecgroupRule(targetPort, secgroup_rule.CreateOptsProtocolOptTCP) } } - members := make([]*pool.Member, 0) + // resolve add memeber if serviceConf.TargetType == TargetTypeIP { - endpoints, err := c.endpointLister.Endpoints(ing.Namespace).Get(service.Name) + membersAddr, err = c.endpointResolver.ResolvePodEndpoints( + types.NamespacedName{Namespace: ing.Namespace, Name: service.Name}, + serviceIntOrStrPort) if err != nil { - klog.Errorf("Failed to get endpoints: %v", err) + klog.Errorf("error when resolve pod endpoints: %v", err) return nil, err } - for _, subset := range endpoints.Subsets { - for _, addr := range subset.Addresses { - members = append(members, &pool.Member{ - IpAddress: addr.IP, - Port: targetPort, - Backup: false, - Weight: 1, - Name: addr.TargetRef.Name, - MonitorPort: targetPort, - }) - } - for _, addr := range subset.NotReadyAddresses { - members = append(members, &pool.Member{ - IpAddress: addr.IP, - Port: targetPort, - Backup: false, - Weight: 1, - Name: addr.TargetRef.Name, - MonitorPort: targetPort, - }) - } - } } else { - for _, addr := range membersAddr { - members = append(members, &pool.Member{ - IpAddress: addr, - Port: nodePort, - Backup: false, - Weight: 1, - Name: poolName, - MonitorPort: monitorPort, - }) + membersAddr, err = c.endpointResolver.ResolveNodePortEndpoints( + types.NamespacedName{Namespace: ing.Namespace, Name: service.Name}, + serviceIntOrStrPort, nodesAfterFilter) + if err != nil { + klog.Errorf("error when resolve node endpoints: %v", err) + return nil, err + } + } + for _, addr := range membersAddr { + monitorPort := addr.Port + if serviceConf.HealthcheckPort != 0 { + monitorPort = serviceConf.HealthcheckPort + } + + if serviceConf.IsAutoCreateSecurityGroup { + ingressInspect.AddSecgroupRule(monitorPort, secgroup_rule.CreateOptsProtocolOptTCP) } + + members = append(members, &pool.Member{ + IpAddress: addr.IP, + Port: addr.Port, + Backup: false, + Weight: 1, + Name: addr.Name, + MonitorPort: monitorPort, + }) } poolOptions := serviceConf.CreatePoolOptions() poolOptions.PoolName = poolName poolOptions.Members = members - if serviceConf.IsAutoCreateSecurityGroup { - ingressInspect.AddSecgroupRule(int(nodePort), secgroup_rule.CreateOptsProtocolOptTCP) - } return &utils.PoolExpander{ UUID: "", CreateOpts: *poolOptions, @@ -1136,10 +1163,12 @@ func (c *Controller) inspectIngress(ing *nwv1.Ingress) (*Expander, error) { }, nil } -func (c *Controller) ensureCompareIngress(oldIng, ing *nwv1.Ingress) (*lObjects.LoadBalancer, error) { +func (c *Controller) ensureCompareIngress(_, ing *nwv1.Ingress) (*lObjects.LoadBalancer, error) { - oldIngExpander, err := c.inspectIngress(oldIng) - if err != nil { + var oldIngExpander *Expander + var ok bool + if oldIngExpander, ok = c.cacheLoadBalancerBuilder[fmt.Sprintf("%s/%s", ing.Namespace, ing.Name)]; !ok { + // build again oldIngExpander, _ = c.inspectIngress(nil) } newIngExpander, err := c.inspectIngress(ing) @@ -1163,6 +1192,7 @@ func (c *Controller) ensureCompareIngress(oldIng, ing *nwv1.Ingress) (*lObjects. klog.Errorln("error when compare ingress", err) return nil, err } + c.cacheLoadBalancerBuilder[fmt.Sprintf("%s/%s", ing.Namespace, ing.Name)] = newIngExpander c.resourceDependant.SetIngress(ing, newIngExpander.serviceConf.TargetType == TargetTypeIP) return lb, nil } diff --git a/pkg/vngcloud/vcontainer.go b/pkg/vngcloud/vcontainer.go index abb58b2..6534e8a 100644 --- a/pkg/vngcloud/vcontainer.go +++ b/pkg/vngcloud/vcontainer.go @@ -69,7 +69,6 @@ func (s *VContainer) LoadBalancer() (lcloudProvider.LoadBalancer, bool) { vLbConfig: s.vLbOpts, config: s.config, trackLBUpdate: utils.NewUpdateTracker(), - serviceCache: make(map[string]*corev1.Service), isReApplyNextTime: false, knownNodes: []*corev1.Node{}, } diff --git a/pkg/vngcloud/vlb.go b/pkg/vngcloud/vlb.go index f1760aa..1f0410c 100644 --- a/pkg/vngcloud/vlb.go +++ b/pkg/vngcloud/vlb.go @@ -11,7 +11,9 @@ import ( "github.com/sirupsen/logrus" lSdkClient "github.com/vngcloud/cloud-provider-vngcloud/pkg/client" + "github.com/vngcloud/cloud-provider-vngcloud/pkg/cni_detector" "github.com/vngcloud/cloud-provider-vngcloud/pkg/consts" + "github.com/vngcloud/cloud-provider-vngcloud/pkg/endpoint_resolver" lMetrics "github.com/vngcloud/cloud-provider-vngcloud/pkg/metrics" "github.com/vngcloud/cloud-provider-vngcloud/pkg/utils" vErrors "github.com/vngcloud/cloud-provider-vngcloud/pkg/utils/errors" @@ -22,7 +24,10 @@ import ( "github.com/vngcloud/vngcloud-go-sdk/vngcloud/services/loadbalancer/v2/listener" "github.com/vngcloud/vngcloud-go-sdk/vngcloud/services/loadbalancer/v2/loadbalancer" "github.com/vngcloud/vngcloud-go-sdk/vngcloud/services/loadbalancer/v2/pool" + "github.com/vngcloud/vngcloud-go-sdk/vngcloud/services/network/v2/extensions/secgroup_rule" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -67,7 +72,6 @@ type ( kubeClient kubernetes.Interface eventRecorder record.EventRecorder vLbConfig VLbOpts - serviceCache map[string]*corev1.Service knownNodes []*corev1.Node serviceLister corelisters.ServiceLister @@ -85,6 +89,11 @@ type ( stringKeyLock *utils.StringKeyLock resourceDependant utils.ResourceDependant + endpointResolver endpoint_resolver.EndpointResolver + cniType cni_detector.CNIType + + // store to delete redundant loadbalancer resources + cacheLoadBalancerBuilder map[string]*Expander } // Config is the configuration for the VNG CLOUD load balancer controller, @@ -111,9 +120,18 @@ func (c *vLB) Init() { c.numOfUpdatingThread = 0 c.stringKeyLock = utils.NewStringKeyLock() c.resourceDependant = utils.NewResourceDependant() + c.cacheLoadBalancerBuilder = make(map[string]*Expander) + + var err error + c.cniType, err = cni_detector.NewDetector(c.kubeClient).DetectCNIType() + if err != nil { + klog.Errorf("failed to detect CNI type: %v", err) + c.cniType = cni_detector.UnknownCNI + } + klog.Infof("Detected CNI type: %s", c.cniType) endpointInformer := kubeInformerFactory.Core().V1().Endpoints() - _, err := endpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err = endpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { object := obj.(*corev1.Endpoints) requests := c.resourceDependant.GetServiceNeedReconcile("endpoint", object.GetNamespace(), object.GetName()) @@ -166,6 +184,8 @@ func (c *vLB) Init() { c.endpointLister = endpointInformer.Lister() c.endpointListerSynced = endpointInformer.Informer().HasSynced + c.endpointResolver = endpoint_resolver.NewDefaultEndpointResolver(c.serviceLister, c.endpointLister) + defer close(c.stopCh) go c.informer.Start(c.stopCh) @@ -265,9 +285,11 @@ func (c *vLB) ensureLoadBalancer( }() serviceKey := fmt.Sprintf("%s/%s", pService.Namespace, pService.Name) - oldIngExpander, _ := c.inspectService(nil, pNodes) - if oldService, ok := c.serviceCache[serviceKey]; ok { - oldIngExpander, _ = c.inspectService(oldService, pNodes) + var oldIngExpander *Expander + var ok bool + if oldIngExpander, ok = c.cacheLoadBalancerBuilder[serviceKey]; !ok { + // build again + oldIngExpander, _ = c.inspectService(nil, pNodes) } newIngExpander, err := c.inspectService(pService, pNodes) if err != nil { @@ -296,7 +318,7 @@ func (c *vLB) ensureLoadBalancer( userLb, _ := vngcloudutil.GetLB(c.vLBSC, c.getProjectID(), lb.UUID) c.trackLBUpdate.AddUpdateTracker(userLb.UUID, fmt.Sprintf("%s/%s", pService.Namespace, pService.Name), userLb.UpdatedAt) - c.serviceCache[serviceKey] = pService.DeepCopy() + c.cacheLoadBalancerBuilder[serviceKey] = newIngExpander klog.Infof( "Load balancer %s for service %s/%s is ready to use for Kubernetes controller\n----- DONE ----- ", @@ -351,10 +373,16 @@ func (c *vLB) ensureDeleteLoadBalancer(_ context.Context, _ string, pService *co return err } - oldIngExpander, err := c.inspectService(pService, c.knownNodes) - if err != nil { - oldIngExpander, _ = c.inspectService(nil, c.knownNodes) + var oldIngExpander *Expander + var ok bool + if oldIngExpander, ok = c.cacheLoadBalancerBuilder[fmt.Sprintf("%s/%s", pService.Namespace, pService.Name)]; !ok { + // build again + oldIngExpander, err = c.inspectService(pService, c.knownNodes) + if err != nil { + oldIngExpander, _ = c.inspectService(nil, c.knownNodes) + } } + newIngExpander, err := c.inspectService(nil, c.knownNodes) if err != nil { klog.Errorln("error when inspect new service:", err) @@ -427,6 +455,7 @@ func (c *vLB) ensureDeleteLoadBalancer(_ context.Context, _ string, pService *co klog.Errorln("error when delete lb", err) return err } + delete(c.cacheLoadBalancerBuilder, fmt.Sprintf("%s/%s", pService.Namespace, pService.Name)) return nil } @@ -435,6 +464,7 @@ func (c *vLB) ensureDeleteLoadBalancer(_ context.Context, _ string, pService *co klog.Errorln("error when compare service", err) return err } + delete(c.cacheLoadBalancerBuilder, fmt.Sprintf("%s/%s", pService.Namespace, pService.Name)) return nil } @@ -566,7 +596,6 @@ func (c *vLB) inspectService(pService *corev1.Service, pNodes []*corev1.Node) (* klog.Errorf("No nodes found in the cluster") return nil, vErrors.ErrNoNodeAvailable } - membersAddr := utils.GetNodeMembersAddr(nodesAfterFilter) // get subnetID of this ingress providerIDs := utils.GetListProviderID(pNodes) @@ -603,74 +632,77 @@ func (c *vLB) inspectService(pService *corev1.Service, pNodes []*corev1.Node) (* poolName := serviceConf.GenPoolName(c.getClusterID(), pService, consts.RESOURCE_TYPE_SERVICE, port) listenerName := serviceConf.GenListenerName(c.getClusterID(), pService, consts.RESOURCE_TYPE_SERVICE, port) - monitorPort := int(port.NodePort) if serviceConf.HealthcheckPort != 0 { - monitorPort = serviceConf.HealthcheckPort if serviceConf.IsAutoCreateSecurityGroup { - ingressInspect.AddSecgroupRule(monitorPort, + ingressInspect.AddSecgroupRule(serviceConf.HealthcheckPort, vngcloudutil.HealthcheckProtocoToSecGroupProtocol(string(port.Protocol))) if strings.EqualFold(string(port.Protocol), "UDP") { - ingressInspect.AddSecgroupRule(monitorPort, + ingressInspect.AddSecgroupRule(serviceConf.HealthcheckPort, vngcloudutil.HealthcheckProtocoToSecGroupProtocol("ICMP")) } } } + var membersAddr []endpoint_resolver.EndpointAddress members := make([]*pool.Member, 0) - if serviceConf.TargetType == TargetTypeIP { - endpoints, err := c.endpointLister.Endpoints(pService.Namespace).Get(pService.Name) + + // add security group rule for each target port, cilium native routing need to open these ports + if serviceConf.IsAutoCreateSecurityGroup && c.cniType == cni_detector.CiliumNativeRouting { + // get list target port + serviceTagetPortList, err := c.endpointResolver.GetListTargetPort(types.NamespacedName{Namespace: pService.Namespace, Name: pService.Name}, + intstr.FromInt(int(port.Port))) if err != nil { - klog.Errorf("Failed to get endpoints: %v", err) + klog.Errorf("error when get list target port: %v", err) return nil, err } - for _, subset := range endpoints.Subsets { - for _, addr := range subset.Addresses { - members = append(members, &pool.Member{ - IpAddress: addr.IP, - Port: port.TargetPort.IntValue(), - Backup: false, - Weight: 1, - Name: addr.TargetRef.Name, - MonitorPort: port.TargetPort.IntValue(), - }) - } - for _, addr := range subset.NotReadyAddresses { - members = append(members, &pool.Member{ - IpAddress: addr.IP, - Port: port.TargetPort.IntValue(), - Backup: false, - Weight: 1, - Name: addr.TargetRef.Name, - MonitorPort: port.TargetPort.IntValue(), - }) - } + for _, targetPort := range serviceTagetPortList { + ingressInspect.AddSecgroupRule(targetPort, secgroup_rule.CreateOptsProtocolOptTCP) + } + } + + // resolve add memeber + if serviceConf.TargetType == TargetTypeIP { + membersAddr, err = c.endpointResolver.ResolvePodEndpoints( + types.NamespacedName{Namespace: pService.Namespace, Name: pService.Name}, intstr.FromInt(int(port.Port))) + if err != nil { + klog.Errorf("Failed to resolve pod endpoints: %v", err) + return nil, err } } else { - for _, addr := range membersAddr { - members = append(members, &pool.Member{ - IpAddress: addr, - Port: int(port.NodePort), - Backup: false, - Weight: 1, - Name: poolName, - MonitorPort: monitorPort, - }) + membersAddr, err = c.endpointResolver.ResolveNodePortEndpoints( + types.NamespacedName{Namespace: pService.Namespace, Name: pService.Name}, intstr.FromInt(int(port.Port)), nodesAfterFilter) + if err != nil { + klog.Errorf("Failed to resolve node port endpoints: %v", err) + return nil, err } } - poolOptions := serviceConf.CreatePoolOptions(port) - poolOptions.PoolName = poolName - poolOptions.Members = members + for _, addr := range membersAddr { + monitorPort := addr.Port + if serviceConf.HealthcheckPort != 0 { + monitorPort = serviceConf.HealthcheckPort + } - if serviceConf.IsAutoCreateSecurityGroup { if serviceConf.IsAutoCreateSecurityGroup { - ingressInspect.AddSecgroupRule(int(port.NodePort), + ingressInspect.AddSecgroupRule(addr.Port, vngcloudutil.HealthcheckProtocoToSecGroupProtocol(string(port.Protocol))) if strings.EqualFold(string(port.Protocol), "UDP") { ingressInspect.AddSecgroupRule(monitorPort, vngcloudutil.HealthcheckProtocoToSecGroupProtocol("ICMP")) } } + + members = append(members, &pool.Member{ + IpAddress: addr.IP, + Port: addr.Port, + Backup: false, + Weight: 1, + Name: addr.Name, + MonitorPort: monitorPort, + }) } + poolOptions := serviceConf.CreatePoolOptions(port) + poolOptions.PoolName = poolName + poolOptions.Members = members listenerOptions := serviceConf.CreateListenerOptions(port) listenerOptions.ListenerName = listenerName @@ -1138,8 +1170,14 @@ func (c *vLB) ensureSecurityGroups(oldInspect, inspect *Expander) error { vngcloudutil.WaitForServerActive(c.vServerSC, c.getProjectID(), instanceID) return err } + + // ensure security groups for all instances + oldInspectSecgroups := make([]string, 0) + if oldInspect != nil && oldInspect.serviceConf != nil && oldInspect.serviceConf.SecurityGroups != nil { + oldInspectSecgroups = oldInspect.serviceConf.SecurityGroups + } for _, instanceID := range inspect.InstanceIDs { - err := ensureSecGroupsForInstance(instanceID, oldInspect.serviceConf.SecurityGroups, validSecgroups) + err := ensureSecGroupsForInstance(instanceID, oldInspectSecgroups, validSecgroups) if err != nil { klog.Errorln("error when ensure security groups for instance", err) }