Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update NEG reference in NEG CR #2604

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,9 @@ func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.S
var errList []error
shouldDeleteNegCR := true
deleteByZone := len(svcNegCR.Status.NetworkEndpointGroups) == 0
// Change this to a map from NEG name to sets once we allow multiple NEGs
// in a specific zone(multi-subnet cluster).
deletedNegs := make(map[negtypes.NegInfo]struct{})

for _, negRef := range svcNegCR.Status.NetworkEndpointGroups {
resourceID, err := cloud.ParseResourceURL(negRef.SelfLink)
Expand All @@ -624,14 +627,34 @@ func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.S
deleteByZone = true
continue
}

shouldDeleteNegCR = shouldDeleteNegCR && manager.deleteNegOrReportErr(resourceID.Key.Name, resourceID.Key.Zone, svcNegCR, &errList)
negDeleted := manager.deleteNegOrReportErr(resourceID.Key.Name, resourceID.Key.Zone, svcNegCR, &errList)
if negDeleted {
deletedNegs[negtypes.NegInfo{Name: resourceID.Key.Name, Zone: resourceID.Key.Zone}] = struct{}{}
}
shouldDeleteNegCR = shouldDeleteNegCR && negDeleted
}

if deleteByZone {
manager.logger.V(2).Info("Deletion candidate has 0 NEG reference", "svcneg", klog.KObj(svcNegCR), "svcNegCR", svcNegCR)
for _, zone := range zones {
shouldDeleteNegCR = shouldDeleteNegCR && manager.deleteNegOrReportErr(svcNegCR.Name, zone, svcNegCR, &errList)
negDeleted := manager.deleteNegOrReportErr(svcNegCR.Name, zone, svcNegCR, &errList)
if negDeleted {
deletedNegs[negtypes.NegInfo{Name: svcNegCR.Name, Zone: zone}] = struct{}{}
}
shouldDeleteNegCR = shouldDeleteNegCR && negDeleted
}
}
// Since no more NEG deletion will be happening at this point, and NEG
// CR will not be deleted, clear the reference for deleted NEGs in the
// NEG CR.
if len(deletedNegs) != 0 {
updatedCR := svcNegCR.DeepCopy()

if errs := ensureExistingNegRef(updatedCR, deletedNegs); len(errs) != 0 {
errList = append(errList, errs...)
}
if _, err := patchNegStatus(manager.svcNegClient, *svcNegCR, *updatedCR); err != nil {
errList = append(errList, err)
}
}

Expand All @@ -655,9 +678,13 @@ func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.S
}

manager.logger.V(2).Info("Deleting NEG CR", "svcneg", klog.KObj(svcNegCR))
if err := deleteSvcNegCR(manager.svcNegClient, svcNegCR, manager.logger); err != nil {
err := deleteSvcNegCR(manager.svcNegClient, svcNegCR, manager.logger)
if err != nil {
manager.logger.V(2).Error(err, "Error when deleting NEG CR", "svcneg", klog.KObj(svcNegCR))
errList = append(errList, err)
return
}
manager.logger.V(2).Info("Deleted NEG CR", "svcneg", klog.KObj(svcNegCR))
}()

return errList
Expand Down Expand Up @@ -685,6 +712,25 @@ func (manager *syncerManager) deleteNegOrReportErr(name, zone string, svcNegCR *
return true
}

// ensureExistingNegRef removes NEG refs in NEG CR for NEGs that have been
// deleted successfully.
func ensureExistingNegRef(neg *negv1beta1.ServiceNetworkEndpointGroup, deletedNegs map[negtypes.NegInfo]struct{}) []error {
var updatedNegRef []negv1beta1.NegObjectReference
var errList []error
for _, negRef := range neg.Status.NetworkEndpointGroups {
negInfo, err := negtypes.NegInfoFromNegRef(negRef)
if err != nil {
errList = append(errList, err)
continue
}
if _, exists := deletedNegs[negInfo]; !exists {
updatedNegRef = append(updatedNegRef, negRef)
}
}
neg.Status.NetworkEndpointGroups = updatedNegRef
return errList
}

// ensureDeleteNetworkEndpointGroup ensures neg is delete from zone
func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string, expectedDesc *utils.NegDescription) error {
neg, err := manager.cloud.GetNetworkEndpointGroup(name, zone, meta.VersionGA, manager.logger)
Expand Down
76 changes: 59 additions & 17 deletions pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,21 @@ import (
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1"
"k8s.io/ingress-gce/pkg/neg/metrics/metricscollector"
"k8s.io/ingress-gce/pkg/neg/syncers/labels"
"k8s.io/ingress-gce/pkg/neg/types"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
negfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake"
"k8s.io/ingress-gce/pkg/utils/common"
namer_util "k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/ingress-gce/pkg/utils/zonegetter"
Expand Down Expand Up @@ -1189,7 +1192,10 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
expectNegGC bool
expectCrGC bool
expectErr bool
gcError error
negGCError error
negGCErrorZone []string
negCrGCError error
expectedNegCount int

// expectGenNamedNegGC indicates that the Neg GC only occurs if using a generated name
// expectNegGC will take precedence over this value
Expand Down Expand Up @@ -1250,6 +1256,7 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
expectNegGC: false,
expectCrGC: true,
negDesc: wrongDesc.String(),
expectedNegCount: 2,
},
{desc: "neg config not in svcPortMap, empty neg list, neg has empty description",
negsExist: true,
Expand All @@ -1258,20 +1265,23 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
expectGenNamedNegGC: true,
expectCrGC: true,
negDesc: "",
expectedNegCount: 2,
},
{desc: "neg config in svcPortMap, marked for deletion",
negsExist: true,
markedForDeletion: true,
desiredConfig: true,
expectNegGC: false,
expectCrGC: false,
expectedNegCount: 2,
},
{desc: "neg config in svcPortMap",
negsExist: true,
markedForDeletion: false,
desiredConfig: true,
expectNegGC: false,
expectCrGC: false,
expectedNegCount: 2,
},
{desc: "negs don't exist, config not in svcPortMap",
negsExist: false,
Expand All @@ -1288,16 +1298,38 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
markedForDeletion: true,
expectCrGC: true,
expectErr: false,
gcError: &googleapi.Error{Code: http.StatusBadRequest},
negGCError: &googleapi.Error{Code: http.StatusBadRequest},
negGCErrorZone: []string{negtypes.TestZone1, negtypes.TestZone2},
negDesc: matchingDesc.String(),
},
{desc: "error during neg gc, config not in svcPortMap",
{desc: "error on all NEG deletions during neg gc, config not in svcPortMap, NEG CR should still have all NEG ref",
negsExist: true,
markedForDeletion: true,
expectCrGC: true,
expectErr: true,
gcError: fmt.Errorf("gc-error"),
negGCError: fmt.Errorf("neg-gc-error"),
negGCErrorZone: []string{negtypes.TestZone1, negtypes.TestZone2},
negDesc: matchingDesc.String(),
expectedNegCount: 2,
},
{desc: "error on one NEG deletion during neg gc, config not in svcPortMap, NEG CR should not have the deleted NEG ref",
negsExist: true,
markedForDeletion: true,
expectCrGC: true,
expectErr: true,
negGCError: fmt.Errorf("neg-gc-error"),
negGCErrorZone: []string{negtypes.TestZone1},
negDesc: matchingDesc.String(),
expectedNegCount: 1,
},
{desc: "error when deleting NEG CR during neg gc, config not in svcPortMap, NEG CR should not have any stale NEG ref",
negsExist: true,
markedForDeletion: false, // Make sure deletion timestamp is not set so we will trigger error when delete NEG CR
expectErr: true,
expectCrGC: false, // NEG CR deletion should fail due to error from deletion API call
negCrGCError: fmt.Errorf("neg-cr-gc-error"),
negDesc: matchingDesc.String(),
expectedNegCount: 0,
},
}

Expand All @@ -1310,6 +1342,13 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
manager, testCloud := NewTestSyncerManager(kubeClient)
svcNegClient := manager.svcNegClient

if tc.negCrGCError != nil {
svcNegClientFake := svcNegClient.(*negfake.Clientset)
svcNegClientFake.PrependReactor("delete", "servicenetworkendpointgroups", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &negv1beta1.ServiceNetworkEndpointGroup{}, tc.negCrGCError
})
}

manager.serviceLister.Add(svc)
fakeNegCloud := manager.cloud

Expand Down Expand Up @@ -1349,7 +1388,7 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
}

if _, err := manager.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(cr.Namespace).Create(context2.TODO(), &cr, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create neg cr")
t.Fatalf("failed to create neg cr: %v", err)
}

crs := getNegCRs(t, svcNegClient, testServiceNamespace)
Expand All @@ -1370,19 +1409,20 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
}
}

if tc.gcError != nil {
if tc.negGCError != nil {
mockCloud := testCloud.Compute().(*cloud.MockGCE)
mockNEG := mockCloud.NetworkEndpointGroups().(*cloud.MockNetworkEndpointGroups)

for _, zone := range []string{negtypes.TestZone1, negtypes.TestZone2} {
mockNEG.DeleteError[*meta.ZonalKey(negName, zone)] = tc.gcError
for _, zone := range tc.negGCErrorZone {
mockNEG.DeleteError[*meta.ZonalKey(negName, zone)] = tc.negGCError
}
}

err := manager.GC()
if !tc.expectErr && err != nil {
t.Fatalf("failed to GC: %v", err)
} else if tc.expectErr && err == nil {
}
if tc.expectErr && err == nil {
t.Errorf("expected GC to error")
}

Expand All @@ -1391,21 +1431,23 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
t.Errorf("failed getting negs from cloud: %s", err)
}

numExistingNegs, negsDeleted := checkForNegDeletions(negs, negName)
numExistingNegs := checkForNegDeletions(negs, negName)

expectNegGC := tc.expectNegGC || (tc.expectGenNamedNegGC && !customName)
if tc.negsExist && expectNegGC && !negsDeleted {
if tc.negsExist && expectNegGC && numExistingNegs != 0 {
t.Errorf("expected negs to be GCed, but found %d", numExistingNegs)
} else if tc.negsExist && !expectNegGC && numExistingNegs != 2 {
t.Errorf("expected two negs in the cloud, but found %d", numExistingNegs)
}
if tc.negsExist && !expectNegGC && numExistingNegs != tc.expectedNegCount {
t.Errorf("expected %d negs in the cloud, but found %d", tc.expectedNegCount, numExistingNegs)
}

crs = getNegCRs(t, svcNegClient, testServiceNamespace)
crDeleted := checkForNegCRDeletion(crs, negName)

if tc.expectCrGC && !crDeleted {
t.Errorf("expected neg %s to be deleted", negName)
} else if !tc.expectCrGC && crDeleted && !tc.markedForDeletion {
}
if !tc.expectCrGC && crDeleted && !tc.markedForDeletion {
t.Errorf("expected neg %s to not be deleted", negName)
}
}
Expand Down Expand Up @@ -1559,16 +1601,16 @@ func getNegObjectRefs(t *testing.T, cloud negtypes.NetworkEndpointGroupCloud, zo
return negRefs
}

// checkForNegDeletions checks that negs does not have a neg with the provided negName. If none exists, returns true, otherwise returns false the number of negs found with the name
func checkForNegDeletions(negs map[*meta.Key]*composite.NetworkEndpointGroup, negName string) (int, bool) {
// checkForNegDeletions gets the count of neg objects in negs that has the provided negName.
func checkForNegDeletions(negs map[*meta.Key]*composite.NetworkEndpointGroup, negName string) int {
foundNegs := 0
for _, neg := range negs {
if neg.Name == negName {
foundNegs += 1
}
}

return foundNegs, foundNegs == 0
return foundNegs
}

// checkForNegCRDeletion verifies that either no cr with name `negName` exists or a cr withe name `negName` has its deletion timestamp set
Expand Down
45 changes: 43 additions & 2 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,12 @@ func (s *transactionSyncer) logEndpoints(endpointMap map[string]negtypes.Network
s.logger.V(3).Info("Endpoints for NEG", "description", desc, "endpointMap", endpointMap)
}

// updateInitStatus queries the k8s api server for the current NEG CR and updates the Initialized condition and neg objects as appropriate.
// If neg client is nil, will return immediately
// updateInitStatus takes in the NEG refs based on the existing node zones,
// then queries the k8s api server for the current NEG CR and updates the
// Initialized condition and neg objects as appropriate.
// Before patching the NEG CR, it also includes NEG refs for NEGs are no longer
// needed and change status as INACTIVE.
// If neg client is nil, will return immediately.
func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectReference, errList []error) {
if s.svcNegClient == nil {
return
Expand All @@ -784,6 +788,8 @@ func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectRe
}

neg := origNeg.DeepCopy()
inactiveNegObjRefs := getInactiveNegRefs(origNeg.Status.NetworkEndpointGroups, negObjRefs, s.logger)
negObjRefs = append(negObjRefs, inactiveNegObjRefs...)
neg.Status.NetworkEndpointGroups = negObjRefs

initializedCondition := getInitializedCondition(utilerrors.NewAggregate(errList))
Expand Down Expand Up @@ -917,6 +923,41 @@ func ensureCondition(neg *negv1beta1.ServiceNetworkEndpointGroup, expectedCondit
return expectedCondition
}

// getInactiveNegRefs creates NEG references for NEGs in Inactive State.
// Inactive NEG are NEGs that are no longer needed.
func getInactiveNegRefs(oldNegRefs []negv1beta1.NegObjectReference, currentNegRefs []negv1beta1.NegObjectReference, logger klog.Logger) []negv1beta1.NegObjectReference {
activeNegs := make(map[negtypes.NegInfo]struct{})
for _, negRef := range currentNegRefs {
negInfo, err := negtypes.NegInfoFromNegRef(negRef)
if err != nil {
logger.Error(err, "Failed to extract name and zone information of a neg from the current snapshot", "negId", negRef.Id, "negSelfLink", negRef.SelfLink)
continue
}
activeNegs[negInfo] = struct{}{}
}

var inactiveNegRefs []negv1beta1.NegObjectReference
for _, origNegRef := range oldNegRefs {
negInfo, err := negtypes.NegInfoFromNegRef(origNegRef)
if err != nil {
logger.Error(err, "Failed to extract name and zone information of a neg from the previous snapshot, skipping validating if it is an Inactive NEG", "negId", origNegRef.Id, "negSelfLink", origNegRef.SelfLink)
continue
sawsa307 marked this conversation as resolved.
Show resolved Hide resolved
}

// NEGs are listed based on the current node zones. If a NEG no longer
// exists in the current list, it means there are no nodes/endpoints
// in that specific zone, and we mark it as INACTIVE.
// We use SelfLink as identifier since it contains the unique NEG zone
// and name pair.
if _, exists := activeNegs[negInfo]; !exists {
inactiveNegRef := origNegRef.DeepCopy()
inactiveNegRef.State = negv1beta1.InactiveState
inactiveNegRefs = append(inactiveNegRefs, *inactiveNegRef)
}
}
return inactiveNegRefs
}

// getSyncedCondition returns the expected synced condition based on given error
func getSyncedCondition(err error) negv1beta1.Condition {
if err != nil {
Expand Down
Loading