From 9fb6df35efba0c5d66c63d358d2b5604aae4372f Mon Sep 17 00:00:00 2001 From: David Cheung Date: Wed, 17 Jul 2024 02:21:04 +0000 Subject: [PATCH] Populate State and Subnet for NEG reference in NEG CR. * Populate Subnet in NEG CR based on the subnet on the NEG. * Populate State as ACTIVE for newly created NEGs. For NEGs in the zones which don't have any nodes, set their state as INACTIVE. --- pkg/neg/syncers/transaction.go | 34 ++++- pkg/neg/syncers/transaction_test.go | 185 +++++++++++++++++++++++++--- pkg/neg/syncers/utils.go | 2 + pkg/neg/syncers/utils_test.go | 4 + 4 files changed, 206 insertions(+), 19 deletions(-) diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 31737683f6..136c521a89 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -758,8 +758,12 @@ func (s *transactionSyncer) logEndpoints(endpointMap map[string]negtypes.Network s.logger.V(3).Info("Endpoints for NEG", "description", desc, "endpointMap", endpointMap) } -// updateInitStatus queries the k8s api server for the current NEG CR and updates the Initialized condition and neg objects as appropriate. -// If neg client is nil, will return immediately +// updateInitStatus takes in the NEG refs based on the existing node zones, +// then queries the k8s api server for the current NEG CR and updates the +// Initialized condition and neg objects as appropriate. +// Before patching the NEG CR, it also includes NEG refs for NEGs are no longer +// needed and change status as INACTIVE. +// If neg client is nil, will return immediately. func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectReference, errList []error) { if s.svcNegClient == nil { return @@ -775,6 +779,8 @@ func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectRe neg := origNeg.DeepCopy() if len(negObjRefs) != 0 { + inactiveNegObjRefs := getInactiveNegRefs(origNeg.Status.NetworkEndpointGroups, negObjRefs) + negObjRefs = append(negObjRefs, inactiveNegObjRefs...) neg.Status.NetworkEndpointGroups = negObjRefs } @@ -909,6 +915,30 @@ func ensureCondition(neg *negv1beta1.ServiceNetworkEndpointGroup, expectedCondit return expectedCondition } +// getInactiveNegRefs creates NEG references for NEGs in Inactive State. +// Inactive NEG are NEGs that are no longer needed. +func getInactiveNegRefs(origNegRefs []negv1beta1.NegObjectReference, updatedNegRefs []negv1beta1.NegObjectReference) []negv1beta1.NegObjectReference { + activeNegLinks := sets.New[string]() + for _, negRef := range updatedNegRefs { + activeNegLinks.Insert(negRef.SelfLink) + } + + var inactiveNegRefs []negv1beta1.NegObjectReference + for _, origNegRef := range origNegRefs { + // NEGs are listed based on the current node zones. If a NEG no longer + // exists in the current list, it means there are no nodes/endpoints + // in that specific zone, and we mark it as INACTIVE. + // We use SelfLink as identifier since it contains the unique NEG zone + // and name pair. + if !activeNegLinks.Has(origNegRef.SelfLink) { + inactiveNegRef := origNegRef.DeepCopy() + inactiveNegRef.State = negv1beta1.InactiveState + inactiveNegRefs = append(inactiveNegRefs, *inactiveNegRef) + } + } + return inactiveNegRefs +} + // getSyncedCondition returns the expected synced condition based on given error func getSyncedCondition(err error) negv1beta1.Condition { if err != nil { diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 260bdd6fc8..bd0b8942ed 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -1172,7 +1172,7 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { // fakeZoneGetter will list 3 zones for VM_IP_PORT NEGs. expectZones := sets.NewString(negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone4) - var expectedNegRefs map[string]negv1beta1.NegObjectReference + expectedNegRefs := make(map[string]negv1beta1.NegObjectReference) if tc.negExists { for zone := range expectZones { fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ @@ -1185,7 +1185,10 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { }, zone, klog.TODO()) } ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) - expectedNegRefs = negObjectReferences(ret) + for _, neg := range ret { + negRef := getNegObjectReference(neg, negv1beta1.ActiveState) + expectedNegRefs[negRef.SelfLink] = negRef + } } var refs []negv1beta1.NegObjectReference if tc.crStatusPopulated { @@ -1217,14 +1220,17 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { } ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) if len(expectedNegRefs) == 0 && !tc.expectErr { - expectedNegRefs = negObjectReferences(ret) + for _, neg := range ret { + negRef := getNegObjectReference(neg, negv1beta1.ActiveState) + expectedNegRefs[negRef.SelfLink] = negRef + } } // if error occurs, expect that neg object references are not populated if tc.expectErr && !tc.crStatusPopulated { expectedNegRefs = nil } - checkNegCR(t, negCR, creationTS, expectZones, expectedNegRefs, false, tc.expectErr) + checkNegCR(t, negCR, creationTS, expectedNegRefs, false) if tc.expectErr { // If status is already populated, expect no change even when error occurs checkCondition(t, negCR.Status.Conditions, negv1beta1.Initialized, creationTS, corev1.ConditionFalse, true) @@ -1266,6 +1272,147 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { } } +func TestUpdateInitStatus(t *testing.T) { + t.Parallel() + testNetwork := cloud.ResourcePath("network", &meta.Key{Name: "test-network"}) + testSubnetwork := cloud.ResourcePath("subnetwork", &meta.Key{Name: "test-subnetwork"}) + testNegType := negtypes.VmIpPortEndpointType + + testCases := []struct { + desc string + initialActiveZones sets.String + initialInactiveZones sets.String + updatedActiveZones sets.String + updatedInactiveZones sets.String + }{ + { + desc: "Add a zone while the NEG CR have two ACTIVE NEG ref, a NEG ref in the specific zone should be added to NEG CR with ACTIVE status", + initialActiveZones: sets.NewString(negtypes.TestZone1, negtypes.TestZone2), + updatedActiveZones: sets.NewString(negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone4), + }, + { + desc: "Removed a zone while the NEG CR have two ACTIVE NEG ref, the NEG ref in the removed zone should also be in NEG CR with INACTIVE status", + initialActiveZones: sets.NewString(negtypes.TestZone1, negtypes.TestZone2), + updatedActiveZones: sets.NewString(negtypes.TestZone1), + updatedInactiveZones: sets.NewString(negtypes.TestZone2), + }, + { + desc: "Add back a zone which was previously INACTIVE, the NEG ref in this zone should become ACTIVE in NEG CR", + initialActiveZones: sets.NewString(negtypes.TestZone1), + initialInactiveZones: sets.NewString(negtypes.TestZone2), + updatedActiveZones: sets.NewString(negtypes.TestZone1, negtypes.TestZone2), + }, + { + desc: "Remove another zone while the NEG CR already has an INACTIVE NEG ref, NEG CR should have two INACTIVE NEG ref", + initialActiveZones: sets.NewString(negtypes.TestZone1, negtypes.TestZone2), + initialInactiveZones: sets.NewString(negtypes.TestZone4), + updatedActiveZones: sets.NewString(negtypes.TestZone1), + updatedInactiveZones: sets.NewString(negtypes.TestZone2, negtypes.TestZone4), + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + fakeCloud := negtypes.NewFakeNetworkEndpointGroupCloud(testSubnetwork, testNetwork) + _, syncer := newTestTransactionSyncer(fakeCloud, testNegType, false) + svcNegClient := syncer.svcNegClient + + var initialNegList []negv1beta1.NegObjectReference + initialZones := tc.initialActiveZones.Union(tc.initialInactiveZones) + for zone := range initialZones { + err := fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ + Version: syncer.NegSyncerKey.GetAPIVersion(), + Name: testNegName, + NetworkEndpointType: string(syncer.NegSyncerKey.NegType), + Network: fakeCloud.NetworkURL(), + Subnetwork: fakeCloud.SubnetworkURL(), + }, zone, klog.TODO()) + if err != nil { + t.Fatalf("Failed to create NEG %s in zone %s: %v", testNegName, zone, err) + } + neg, err := fakeCloud.GetNetworkEndpointGroup(testNegName, zone, meta.VersionGA, klog.TODO()) + if err != nil { + t.Fatalf("Failed to get NEG %s in zone %s: %v", testNegName, zone, err) + } + negRef := negv1beta1.NegObjectReference{ + Id: fmt.Sprint(neg.Id), + SelfLink: neg.SelfLink, + NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType), + State: negv1beta1.ActiveState, + SubnetURL: neg.Subnetwork, + } + if tc.initialInactiveZones.Has(zone) { + negRef.State = negv1beta1.InactiveState + } + initialNegList = append(initialNegList, negRef) + } + + creationTS := v1.Now() + origCR := createNegCR(testNegName, creationTS, true, true, initialNegList) + svcNeg, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Create(context2.Background(), origCR, v1.CreateOptions{}) + if err != nil { + t.Errorf("Failed to create test NEG CR: %s", err) + } + syncer.svcNegLister.Add(svcNeg) + + // Create a NEG in a new zone if zone expanded. + addedZones := tc.updatedActiveZones.Difference(tc.initialActiveZones.Union(tc.initialInactiveZones)) + if addedZones != nil { + for zone := range addedZones { + err := fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ + Version: syncer.NegSyncerKey.GetAPIVersion(), + Name: testNegName, + NetworkEndpointType: string(syncer.NegSyncerKey.NegType), + Network: fakeCloud.NetworkURL(), + Subnetwork: fakeCloud.SubnetworkURL(), + }, zone, klog.TODO()) + if err != nil { + t.Fatalf("Failed to create NEG %s in zone %s: %v", testNegName, zone, err) + } + } + } + + // This is the input list to updateInitStatus(). + // It should only include NEG ref in the active zones. + var activeNegList []negv1beta1.NegObjectReference + + // This is the expected NEG ref after updateInitStatus(). + // It should also include NEG ref in the inactive zones. + expectedNegRefs := make(map[string]negv1beta1.NegObjectReference) + + for zone := range tc.updatedActiveZones { + neg, err := fakeCloud.GetNetworkEndpointGroup(testNegName, zone, meta.VersionGA, klog.TODO()) + if err != nil { + t.Fatalf("Failed to get NEG %s in zone %s: %v", testNegName, zone, err) + } + negRef := getNegObjectReference(neg, negv1beta1.ActiveState) + activeNegList = append(activeNegList, negRef) + expectedNegRefs[negRef.SelfLink] = negRef + } + + for zone := range tc.updatedInactiveZones { + neg, err := fakeCloud.GetNetworkEndpointGroup(testNegName, zone, meta.VersionGA, klog.TODO()) + if err != nil { + t.Errorf("Failed to get test NEG: %s", err) + } + negRef := getNegObjectReference(neg, negv1beta1.InactiveState) + expectedNegRefs[negRef.SelfLink] = negRef + } + // Inactive NEG refs should be added if there is any. + syncer.updateInitStatus(activeNegList, nil) + + negCR, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Get(context2.Background(), testNegName, v1.GetOptions{}) + if err != nil { + t.Errorf("Failed to create test NEG CR: %s", err) + } + + checkNegCR(t, negCR, creationTS, expectedNegRefs, false) + }) + } +} + func TestUpdateStatus(t *testing.T) { testNetwork := cloud.ResourcePath("network", &meta.Key{Name: "test-network"}) testSubnetwork := cloud.ResourcePath("subnetwork", &meta.Key{Name: "test-subnetwork"}) @@ -1460,7 +1607,13 @@ func TestIsZoneChange(t *testing.T) { }, zone, klog.TODO()) } ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) - negRefMap := negObjectReferences(ret) + negRefMap := make(map[string]negv1beta1.NegObjectReference) + + for _, neg := range ret { + negRef := getNegObjectReference(neg, negv1beta1.ActiveState) + negRefMap[negRef.SelfLink] = negRef + } + var refs []negv1beta1.NegObjectReference for _, neg := range negRefMap { refs = append(refs, neg) @@ -2430,18 +2583,16 @@ func waitForTransactions(syncer *transactionSyncer) error { }) } -// negObjectReferences returns objectReferences for NEG CRs from NEG Objects -func negObjectReferences(negs map[*meta.Key]*composite.NetworkEndpointGroup) map[string]negv1beta1.NegObjectReference { - - negObjs := make(map[string]negv1beta1.NegObjectReference) - for _, neg := range negs { - negObjs[neg.SelfLink] = negv1beta1.NegObjectReference{ - Id: fmt.Sprint(neg.Id), - SelfLink: neg.SelfLink, - NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType), - } +// getNegObjectReference returns objectReference for NEG CRs from NEG Object +func getNegObjectReference(neg *composite.NetworkEndpointGroup, negState negv1beta1.NegState) negv1beta1.NegObjectReference { + return negv1beta1.NegObjectReference{ + Id: fmt.Sprint(neg.Id), + SelfLink: neg.SelfLink, + NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType), + State: negState, + SubnetURL: neg.Subnetwork, } - return negObjs + } // checks the NEG Description on the cloud NEG Object and verifies with expected @@ -2540,7 +2691,7 @@ func createNegCR(testNegName string, creationTS metav1.Time, populateInitialized } // checkNegCR validates the NegObjectReferences and the LastSyncTime. It will not validate the conditions fields but ensures at most 2 conditions exist -func checkNegCR(t *testing.T, negCR *negv1beta1.ServiceNetworkEndpointGroup, previousLastSyncTime metav1.Time, expectZones sets.String, expectedNegRefs map[string]negv1beta1.NegObjectReference, expectSyncTimeUpdate, expectErr bool) { +func checkNegCR(t *testing.T, negCR *negv1beta1.ServiceNetworkEndpointGroup, previousLastSyncTime metav1.Time, expectedNegRefs map[string]negv1beta1.NegObjectReference, expectSyncTimeUpdate bool) { if expectSyncTimeUpdate && !previousLastSyncTime.Before(&negCR.Status.LastSyncTime) { t.Errorf("Expected Neg CR to have an updated LastSyncTime") } else if !expectSyncTimeUpdate && !negCR.Status.LastSyncTime.IsZero() && !previousLastSyncTime.Equal(&negCR.Status.LastSyncTime) { diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index 96e75c54cb..3cb3ee3c14 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -226,6 +226,8 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService Id: fmt.Sprint(neg.Id), SelfLink: neg.SelfLink, NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType), + State: negv1beta1.ActiveState, + SubnetURL: neg.Subnetwork, } return negRef, nil } diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index 7e3d18d157..011a051631 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -1342,6 +1342,10 @@ func TestNegObjectCrd(t *testing.T) { Id: fmt.Sprint(neg.Id), SelfLink: neg.SelfLink, NetworkEndpointType: negv1beta1.NetworkEndpointType(networkEndpointType), + State: negv1beta1.ActiveState, + } + if networkEndpointType != negtypes.NonGCPPrivateEndpointType { + expectedNegObj.SubnetURL = testSubnetwork } if negObj != expectedNegObj {