diff --git a/pkg/instancegroups/manager.go b/pkg/instancegroups/manager.go index 5355b7a7c1..1f14e4297b 100644 --- a/pkg/instancegroups/manager.go +++ b/pkg/instancegroups/manager.go @@ -190,34 +190,6 @@ func (m *manager) DeleteInstanceGroup(name string) error { return fmt.Errorf("%v", errs) } -// listIGInstances lists all instances of provided instance group name in all zones. -// The return format will be a set of nodes in the instance group and -// a map from node name to zone. -func (m *manager) listIGInstances(name string) (sets.String, map[string]string, error) { - nodeNames := sets.NewString() - nodeZoneMap := make(map[string]string) - zones, err := m.ZoneGetter.ListZones(zonegetter.AllNodesFilter, m.logger) - if err != nil { - return nodeNames, nodeZoneMap, err - } - - for _, zone := range zones { - instances, err := m.cloud.ListInstancesInInstanceGroup(name, zone, allInstances) - if err != nil { - return nodeNames, nodeZoneMap, err - } - for _, ins := range instances { - name, err := utils.KeyName(ins.Instance) - if err != nil { - return nodeNames, nodeZoneMap, err - } - nodeNames.Insert(name) - nodeZoneMap[name] = zone - } - } - return nodeNames, nodeZoneMap, nil -} - // Get returns the Instance Group by name. func (m *manager) Get(name, zone string) (*compute.InstanceGroup, error) { ig, err := m.cloud.GetInstanceGroup(name, zone) @@ -288,68 +260,32 @@ func (m *manager) getInstanceReferences(zone string, nodeNames []string) (refs [ } // Add adds the given instances to the appropriately zoned Instance Group. -func (m *manager) add(groupName string, names []string) error { - events.GlobalEventf(m.recorder, core.EventTypeNormal, events.AddNodes, "Adding %s to InstanceGroup %q", events.TruncatedStringList(names), groupName) - var errs []error - for zone, nodeNames := range m.splitNodesByZone(names) { - m.logger.V(2).Info("Adding nodes to instance group in zone", "nodeCount", len(nodeNames), "name", groupName, "zone", zone) - err := m.cloud.AddInstancesToInstanceGroup(groupName, zone, m.getInstanceReferences(zone, nodeNames)) - if err != nil { - if utils.IsMemberAlreadyExistsError(err) { - m.logger.V(2).Info("Instance already in instance group, skipping the api error: %v, ", err) - } else { - errs = append(errs, err) - } - } - } - if len(errs) == 0 { - return nil +func (m *manager) add(groupName string, nodeNames []string, zone string) error { + events.GlobalEventf(m.recorder, core.EventTypeNormal, events.AddNodes, "Adding %s to InstanceGroup %q", events.TruncatedStringList(nodeNames), groupName) + m.logger.V(1).Info("Adding nodes to instance group in zone", "nodeCount", len(nodeNames), "name", groupName, "zone", zone) + err := m.cloud.AddInstancesToInstanceGroup(groupName, zone, m.getInstanceReferences(zone, nodeNames)) + if err != nil && !utils.IsMemberAlreadyExistsError(err) { + events.GlobalEventf(m.recorder, core.EventTypeWarning, events.AddNodes, "Error adding %s to InstanceGroup %q: %v", events.TruncatedStringList(nodeNames), groupName, err) + return err } - - err := fmt.Errorf("AddInstances: %v", errs) - events.GlobalEventf(m.recorder, core.EventTypeWarning, events.AddNodes, "Error adding %s to InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err) - return err + return nil } // Remove removes the given instances from the appropriately zoned Instance Group. -func (m *manager) remove(groupName string, names []string, nodeZoneMap map[string]string) error { - events.GlobalEventf(m.recorder, core.EventTypeNormal, events.RemoveNodes, "Removing %s from InstanceGroup %q", events.TruncatedStringList(names), groupName) - var errs []error +func (m *manager) remove(groupName string, nodeNames []string, zone string) error { + events.GlobalEventf(m.recorder, core.EventTypeNormal, events.RemoveNodes, "Removing %s from InstanceGroup %q", events.TruncatedStringList(nodeNames), groupName) - // Get the zone information from nameZoneMap instead of ZoneGetter. - // Since the ZoneGetter is based on k8s nodes but in most remove cases, - // k8s nodes do not exist. It will be impossible to get zone infromation. - nodesByZone := map[string][]string{} - for _, name := range names { - zone, ok := nodeZoneMap[name] - if !ok { - m.logger.Error(nil, "Failed to get zones for node, skipping", "name", name) - continue - } - if _, ok := nodesByZone[zone]; !ok { - nodesByZone[zone] = []string{} - } - nodesByZone[zone] = append(nodesByZone[zone], name) - } - - for zone, nodeNames := range nodesByZone { - m.logger.V(1).Info("Removing nodes from instance group in zone", "nodeCount", len(nodeNames), "name", groupName, "zone", zone) - if err := m.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, m.getInstanceReferences(zone, nodeNames)); err != nil { - errs = append(errs, err) - } - } - if len(errs) == 0 { - return nil + m.logger.V(1).Info("Removing nodes from instance group in zone", "nodeCount", len(nodeNames), "name", groupName, "zone", zone) + if err := m.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, m.getInstanceReferences(zone, nodeNames)); err != nil { + events.GlobalEventf(m.recorder, core.EventTypeWarning, events.RemoveNodes, "Error removing nodes %s from InstanceGroup %q: %v", events.TruncatedStringList(nodeNames), groupName, err) + return err } - - err := fmt.Errorf("RemoveInstances: %v", errs) - events.GlobalEventf(m.recorder, core.EventTypeWarning, events.RemoveNodes, "Error removing nodes %s from InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err) - return err + return nil } // Sync nodes with the instances in the instance group. func (m *manager) Sync(nodes []string) (err error) { - m.logger.V(2).Info("Syncing nodes", "nodes", nodes) + m.logger.V(2).Info("Syncing nodes", "nodes", events.TruncatedStringList(nodes)) defer func() { // The node pool is only responsible for syncing nodes to instance @@ -364,56 +300,47 @@ func (m *manager) Sync(nodes []string) (err error) { } }() - pool, err := m.List() - if err != nil { - m.logger.Error(err, "List error") - return err - } + // For each zone add up to #m.maxIGSize number of nodes to the instance group + // If there is more then truncate last nodes (in alphabetical order) + // the logic should be consistent with cloud-provider-gcp's Legacy L4 ILB Controller: + // https://github.com/kubernetes/cloud-provider-gcp/blob/fca628cb3bf9267def0abb509eaae87d2d4040f3/providers/gce/gce_loadbalancer_internal.go#L606C1-L675C1 + // the m.maxIGSize should be set to 1000 as is in the cloud-provider-gcp. + zonedNodes := m.splitNodesByZone(nodes) + for zone, kubeNodesFromZone := range zonedNodes { + igName := m.namer.InstanceGroup() + if len(kubeNodesFromZone) > m.maxIGSize { + sortedKubeNodesFromZone := sets.NewString(kubeNodesFromZone...).List() + loggableNodeList := events.TruncatedStringList(sortedKubeNodesFromZone[m.maxIGSize:]) + m.logger.Info(fmt.Sprintf("Total number of kubeNodes: %d, truncating to maximum Instance Group size = %d. zone: %s. First truncated instances: %v", len(kubeNodesFromZone), m.maxIGSize, zone, loggableNodeList)) + kubeNodesFromZone = sortedKubeNodesFromZone[:m.maxIGSize] + } + + kubeNodes := sets.NewString(kubeNodesFromZone...) - for _, igName := range pool { - // Keep the zone information for each node in this map. - // This will be used as a reference to get zone information - // when removing nodes. - gceNodes, gceNodeZoneMap, err := m.listIGInstances(igName) + gceNodes := sets.NewString() + instances, err := m.cloud.ListInstancesInInstanceGroup(igName, zone, allInstances) if err != nil { - m.logger.Error(err, "listIGInstances error", "name", igName) + m.logger.Error(err, "Failed to list instance from instance group", "zone", zone, "igName", igName) return err } - kubeNodes := sets.NewString(nodes...) - - // Individual InstanceGroup has a limit for 1000 instances in it. - // As a result, it's not possible to add more to it. - if len(kubeNodes) > m.maxIGSize { - // List() will return a sorted list so the kubeNodesList truncation will have a stable set of nodes. - kubeNodesList := kubeNodes.List() - - // Store first 10 truncated nodes for logging - truncateForLogs := func(nodes []string) []string { - maxLogsSampleSize := 10 - if len(nodes) <= maxLogsSampleSize { - return nodes - } - return nodes[:maxLogsSampleSize] + for _, ins := range instances { + instance, err := utils.KeyName(ins.Instance) + if err != nil { + m.logger.Error(err, "Failed to read instance name from ULR, skipping single instance", "Instance URL", ins.Instance) } - - m.logger.Info(fmt.Sprintf("Total number of kubeNodes: %d, truncating to maximum Instance Group size = %d. Instance group name: %s. First truncated instances: %v", len(kubeNodesList), m.maxIGSize, igName, truncateForLogs(nodes[m.maxIGSize:]))) - kubeNodes = sets.NewString(kubeNodesList[:m.maxIGSize]...) + gceNodes.Insert(instance) } - // A node deleted via kubernetes could still exist as a gce vm. We don't - // want to route requests to it. Similarly, a node added to kubernetes - // needs to get added to the instance group so we do route requests to it. - removeNodes := gceNodes.Difference(kubeNodes).List() addNodes := kubeNodes.Difference(gceNodes).List() - m.logger.V(2).Info("Removing nodes", "removeNodes", removeNodes) - m.logger.V(2).Info("Adding nodes", "addNodes", addNodes) + m.logger.V(2).Info("Removing nodes", "removeNodes", events.TruncatedStringList(removeNodes)) + m.logger.V(2).Info("Adding nodes", "addNodes", events.TruncatedStringList(removeNodes)) start := time.Now() if len(removeNodes) != 0 { - err = m.remove(igName, removeNodes, gceNodeZoneMap) - m.logger.V(2).Info("Remove finished", "name", igName, "err", err, "timeTaken", time.Now().Sub(start), "removeNodes", removeNodes) + err = m.remove(igName, removeNodes, zone) + m.logger.V(2).Info("Remove finished", "name", igName, "err", err, "timeTaken", time.Now().Sub(start), "removeNodes", events.TruncatedStringList(removeNodes)) if err != nil { return err } @@ -421,8 +348,8 @@ func (m *manager) Sync(nodes []string) (err error) { start = time.Now() if len(addNodes) != 0 { - err = m.add(igName, addNodes) - m.logger.V(2).Info("Add finished", "name", igName, "err", err, "timeTaken", time.Now().Sub(start), "addNodes", addNodes) + err = m.add(igName, addNodes, zone) + m.logger.V(2).Info("Add finished", "name", igName, "err", err, "timeTaken", time.Now().Sub(start), "addNodes", events.TruncatedStringList(addNodes)) if err != nil { return err } diff --git a/pkg/instancegroups/manager_test.go b/pkg/instancegroups/manager_test.go index 837ef38231..4c554de82a 100644 --- a/pkg/instancegroups/manager_test.go +++ b/pkg/instancegroups/manager_test.go @@ -34,6 +34,9 @@ import ( const ( defaultTestZone = "default-zone" + testZoneA = "dark-moon1-a" + testZoneB = "dark-moon1-b" + testZoneC = "dark-moon1-c" basePath = "/basepath/projects/project-id/" defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default" @@ -41,7 +44,7 @@ const ( var defaultNamer = namer.NewNamer("uid1", "fw1", klog.TODO()) -func newNodePool(f Provider, zone string, maxIGSize int) Manager { +func newNodePool(f Provider, maxIGSize int) Manager { nodeInformer := zonegetter.FakeNodeInformer() fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) @@ -102,7 +105,7 @@ func TestNodePoolSync(t *testing.T) { } fakeGCEInstanceGroups := NewFakeInstanceGroups(zonesToIGs, maxIGSize) - pool := newNodePool(fakeGCEInstanceGroups, defaultTestZone, maxIGSize) + pool := newNodePool(fakeGCEInstanceGroups, maxIGSize) for _, kubeNode := range testCase.kubeNodes.List() { manager := pool.(*manager) zonegetter.AddFakeNodes(manager.ZoneGetter, defaultTestZone, kubeNode) @@ -170,7 +173,7 @@ func TestInstanceAlreadyMemberOfIG(t *testing.T) { fakeInstanceGroups := new(fakeIGAlreadyExists) fakeInstanceGroups.FakeInstanceGroups = NewFakeInstanceGroups(map[string]IGsToInstances{}, maxIGSize) - pool := newNodePool(fakeInstanceGroups, defaultTestZone, maxIGSize) + pool := newNodePool(fakeInstanceGroups, maxIGSize) for _, kubeNode := range kubeNodes.List() { manager := pool.(*manager) zonegetter.AddFakeNodes(manager.ZoneGetter, defaultTestZone, kubeNode) @@ -210,6 +213,146 @@ func (fakeIG *fakeIGAlreadyExists) AddInstancesToInstanceGroup(name, zone string } } +func TestNodePoolSyncHugeCluster(t *testing.T) { + // for sake of easier debugging cap instance group size to 3 + maxIGSize := 3 + + testCases := []struct { + description string + gceNodesZoneA sets.String + gceNodesZoneB sets.String + gceNodesZoneC sets.String + kubeNodesZoneA sets.String + kubeNodesZoneB sets.String + kubeNodesZoneC sets.String + }{ + { + description: "too many kube nodes in one 1 of 3 zone", + gceNodesZoneA: getNodeSlice("nodes-zone-a", maxIGSize), + gceNodesZoneB: getNodeSlice("nodes-zone-b", 2*maxIGSize), + gceNodesZoneC: getNodeSlice("nodes-zone-c", maxIGSize), + kubeNodesZoneA: getNodeSlice("nodes-zone-a", maxIGSize), + kubeNodesZoneB: getNodeSlice("nodes-zone-b", 2*maxIGSize), + kubeNodesZoneC: getNodeSlice("nodes-zone-c", maxIGSize), + }, + { + description: "too many kube nodes in 2 of 3 zones", + gceNodesZoneA: getNodeSlice("nodes-zone-a", maxIGSize), + gceNodesZoneB: getNodeSlice("nodes-zone-b", 2*maxIGSize), + gceNodesZoneC: getNodeSlice("nodes-zone-c", 2*maxIGSize), + kubeNodesZoneA: getNodeSlice("nodes-zone-a", maxIGSize), + kubeNodesZoneB: getNodeSlice("nodes-zone-b", 2*maxIGSize+1), + kubeNodesZoneC: getNodeSlice("nodes-zone-c", 2*maxIGSize+2), + }, + { + description: "too many kube nodes in 3 of 3 zones", + gceNodesZoneA: getNodeSlice("nodes-zone-a", 2*maxIGSize), + gceNodesZoneB: getNodeSlice("nodes-zone-b", 2*maxIGSize+1), + gceNodesZoneC: getNodeSlice("nodes-zone-c", 2*maxIGSize+2), + kubeNodesZoneA: getNodeSlice("nodes-zone-a", 2*maxIGSize), + kubeNodesZoneB: getNodeSlice("nodes-zone-b", 2*maxIGSize), + kubeNodesZoneC: getNodeSlice("nodes-zone-c", 2*maxIGSize), + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + + igName := defaultNamer.InstanceGroup() + fakeGCEInstanceGroups := NewFakeInstanceGroups(map[string]IGsToInstances{}, maxIGSize) + pool := newNodePool(fakeGCEInstanceGroups, maxIGSize) + manager := pool.(*manager) + zonegetter.AddFakeNodes(manager.ZoneGetter, testZoneA, tc.gceNodesZoneA.List()...) + zonegetter.AddFakeNodes(manager.ZoneGetter, testZoneB, tc.gceNodesZoneB.List()...) + zonegetter.AddFakeNodes(manager.ZoneGetter, testZoneC, tc.gceNodesZoneC.List()...) + + ports := []int64{80} + _, err := pool.EnsureInstanceGroupsAndPorts(igName, ports) + if err != nil { + t.Fatalf("pool.EnsureInstanceGroupsAndPorts(%s, %v) returned error %v, want nil", igName, ports, err) + } + allKubeNodes := append(tc.kubeNodesZoneA.List(), tc.kubeNodesZoneB.List()...) + allKubeNodes = append(allKubeNodes, tc.kubeNodesZoneC.List()...) + + // Execute manager's main instance group sync function + err = pool.Sync(allKubeNodes) + if err != nil { + t.Fatalf("pool.Sync(_) returned error %v, want nil", err) + } + + // Check that instance group in each zone has only `maxIGSize` number of nodes + // including zones with 2*maxIGSize nodes + for _, zone := range []string{testZoneA, testZoneB, testZoneC} { + numberOfIGsInZone := len(fakeGCEInstanceGroups.zonesToIGsToInstances[zone]) + if numberOfIGsInZone != 1 { + t.Errorf("Unexpected instance group added, got %v, want: 1", numberOfIGsInZone) + } + for _, igToInstances := range fakeGCEInstanceGroups.zonesToIGsToInstances[zone] { + t.Logf("number of nodes in instance group from zone: %v, got %v", zone, len(igToInstances)) + if len(igToInstances) > maxIGSize { + t.Errorf("unexpected number of nodes in instance group from zone: %v, got %v, want: %v", zone, len(igToInstances), maxIGSize) + } + } + } + + apiCallsCountBeforeSync := len(fakeGCEInstanceGroups.calls) + err = pool.Sync(allKubeNodes) + if err != nil { + t.Fatalf("pool.Sync(_) returned error %v, want nil", err) + } + apiCallsCountAfterSync := len(fakeGCEInstanceGroups.calls) + if apiCallsCountBeforeSync != apiCallsCountAfterSync { + t.Errorf("Should skip sync if called second time with the same kubeNodes. apiCallsCountBeforeSync = %d, apiCallsCountAfterSync = %d", apiCallsCountBeforeSync, apiCallsCountAfterSync) + } + }) + } +} + +// TestInstanceTruncatingOrder verifies if nodes over maxIGSize are truncated from the last one (alphabetically) +func TestInstanceTruncatingOrder(t *testing.T) { + + maxIGSize := 3 + gceNodesZoneA := []string{"d-node", "c-node", "b-node", "a-node"} + kubeNodesZoneA := []string{"d-node", "c-node", "b-node", "a-node"} + + igName := defaultNamer.InstanceGroup() + fakeGCEInstanceGroups := NewFakeInstanceGroups(map[string]IGsToInstances{}, maxIGSize) + pool := newNodePool(fakeGCEInstanceGroups, maxIGSize) + manager := pool.(*manager) + zonegetter.AddFakeNodes(manager.ZoneGetter, testZoneA, gceNodesZoneA...) + + ports := []int64{80} + _, err := pool.EnsureInstanceGroupsAndPorts(igName, ports) + if err != nil { + t.Fatalf("pool.EnsureInstanceGroupsAndPorts(%s, %v) returned error %v, want nil", igName, ports, err) + } + + // Execute manager's main instance group sync function + err = pool.Sync(kubeNodesZoneA) + if err != nil { + t.Fatalf("pool.Sync(_) returned error %v, want nil", err) + } + + numberOfIGsInZone := len(fakeGCEInstanceGroups.zonesToIGsToInstances[testZoneA]) + if numberOfIGsInZone != 1 { + t.Errorf("Unexpected instance group added, got %v, want: 1", numberOfIGsInZone) + } + for _, instancesSet := range fakeGCEInstanceGroups.zonesToIGsToInstances[testZoneA] { + if instancesSet.Has("d-node") { + t.Errorf("Last nodes (alphabetically) should be truncated first.") + } + + } +} + +func getNodeSlice(prefix string, size int) sets.String { + nodes := make([]string, size) + for i := 0; i < size; i++ { + nodes[i] = fmt.Sprintf("%s-%d", prefix, i) + } + return sets.NewString(nodes...) +} + func TestSetNamedPorts(t *testing.T) { maxIGSize := 1000 zonesToIGs := map[string]IGsToInstances{ @@ -218,7 +361,7 @@ func TestSetNamedPorts(t *testing.T) { }, } fakeIGs := NewFakeInstanceGroups(zonesToIGs, maxIGSize) - pool := newNodePool(fakeIGs, defaultTestZone, maxIGSize) + pool := newNodePool(fakeIGs, maxIGSize) manager := pool.(*manager) zonegetter.AddFakeNodes(manager.ZoneGetter, defaultTestZone, "test-node") @@ -275,7 +418,7 @@ func TestGetInstanceReferences(t *testing.T) { &compute.InstanceGroup{Name: "ig"}: sets.NewString("ig"), }, } - pool := newNodePool(NewFakeInstanceGroups(zonesToIGs, maxIGSize), defaultTestZone, maxIGSize) + pool := newNodePool(NewFakeInstanceGroups(zonesToIGs, maxIGSize), maxIGSize) instances := pool.(*manager) nodeNames := []string{"node-1", "node-2", "node-3", "node-4.region.zone"} diff --git a/pkg/l4lb/l4netlbcontroller_test.go b/pkg/l4lb/l4netlbcontroller_test.go index e6ae9c2b3f..29a99ee5c2 100644 --- a/pkg/l4lb/l4netlbcontroller_test.go +++ b/pkg/l4lb/l4netlbcontroller_test.go @@ -770,17 +770,23 @@ func TestProcessServiceCreationFailed(t *testing.T) { for _, param := range []struct { addMockFunc func(*cloud.MockGCE) expectedError string - }{{addMockFunc: func(c *cloud.MockGCE) { c.MockInstanceGroups.GetHook = test.GetErrorInstanceGroupHook }, - expectedError: "lc.instancePool.EnsureInstanceGroupsAndPorts(k8s-ig--aaaaa, []) returned error GetErrorInstanceGroupHook"}, - {addMockFunc: func(c *cloud.MockGCE) { c.MockInstanceGroups.ListHook = test.ListErrorHook }, - expectedError: "ListErrorHook"}, - {addMockFunc: func(c *cloud.MockGCE) { c.MockInstanceGroups.InsertHook = test.InsertErrorHook }, - expectedError: "lc.instancePool.EnsureInstanceGroupsAndPorts(k8s-ig--aaaaa, []) returned error InsertErrorHook"}, - - {addMockFunc: func(c *cloud.MockGCE) { c.MockInstanceGroups.AddInstancesHook = test.AddInstancesErrorHook }, - expectedError: "AddInstances: [AddInstancesErrorHook]"}, - {addMockFunc: func(c *cloud.MockGCE) { c.MockInstanceGroups.ListInstancesHook = test.ListInstancesWithErrorHook }, - expectedError: "ListInstancesWithErrorHook"}, + }{ + { + addMockFunc: func(c *cloud.MockGCE) { c.MockInstanceGroups.GetHook = test.GetErrorInstanceGroupHook }, + expectedError: "lc.instancePool.EnsureInstanceGroupsAndPorts(k8s-ig--aaaaa, []) returned error GetErrorInstanceGroupHook", + }, + { + addMockFunc: func(c *cloud.MockGCE) { c.MockInstanceGroups.InsertHook = test.InsertErrorHook }, + expectedError: "lc.instancePool.EnsureInstanceGroupsAndPorts(k8s-ig--aaaaa, []) returned error InsertErrorHook", + }, + { + addMockFunc: func(c *cloud.MockGCE) { c.MockInstanceGroups.AddInstancesHook = test.AddInstancesErrorHook }, + expectedError: "AddInstancesErrorHook", + }, + { + addMockFunc: func(c *cloud.MockGCE) { c.MockInstanceGroups.ListInstancesHook = test.ListInstancesWithErrorHook }, + expectedError: "ListInstancesWithErrorHook", + }, } { lc := newL4NetLBServiceController() param.addMockFunc((lc.ctx.Cloud.Compute().(*cloud.MockGCE))) @@ -789,7 +795,7 @@ func TestProcessServiceCreationFailed(t *testing.T) { key, _ := common.KeyFunc(svc) err := lc.sync(key, klog.TODO()) if err == nil || err.Error() != param.expectedError { - t.Errorf("Error mismatch '%v' != '%v'", err, param.expectedError) + t.Errorf("Error mismatch got:'%v' want: '%v'", err, param.expectedError) } } }