Skip to content

Commit

Permalink
Populate State and Subnet for NEG reference in NEG CR.
Browse files Browse the repository at this point in the history
* Populate Subnet in NEG CR based on the subnet on the NEG.
* Populate State as ACTIVE for newly created NEGs. For NEGs in the zones
  which don't have any nodes, set their state as INACTIVE.
  • Loading branch information
sawsa307 committed Aug 2, 2024
1 parent d994735 commit 67b8e91
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 20 deletions.
43 changes: 41 additions & 2 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,8 +767,12 @@ func (s *transactionSyncer) logEndpoints(endpointMap map[string]negtypes.Network
s.logger.V(3).Info("Endpoints for NEG", "description", desc, "endpointMap", endpointMap)
}

// updateInitStatus queries the k8s api server for the current NEG CR and updates the Initialized condition and neg objects as appropriate.
// If neg client is nil, will return immediately
// updateInitStatus takes in the NEG refs based on the existing node zones,
// then queries the k8s api server for the current NEG CR and updates the
// Initialized condition and neg objects as appropriate.
// Before patching the NEG CR, it also includes NEG refs for NEGs are no longer
// needed and change status as INACTIVE.
// If neg client is nil, will return immediately.
func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectReference, errList []error) {
if s.svcNegClient == nil {
return
Expand All @@ -782,6 +786,8 @@ func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectRe
}

neg := origNeg.DeepCopy()
inactiveNegObjRefs := getInactiveNegRefs(origNeg.Status.NetworkEndpointGroups, negObjRefs)
negObjRefs = append(negObjRefs, inactiveNegObjRefs...)
neg.Status.NetworkEndpointGroups = negObjRefs

initializedCondition := getInitializedCondition(utilerrors.NewAggregate(errList))
Expand Down Expand Up @@ -915,6 +921,39 @@ func ensureCondition(neg *negv1beta1.ServiceNetworkEndpointGroup, expectedCondit
return expectedCondition
}

// getInactiveNegRefs creates NEG references for NEGs in Inactive State.
// Inactive NEG are NEGs that are no longer needed.
func getInactiveNegRefs(oldNegRefs []negv1beta1.NegObjectReference, currentNegRefs []negv1beta1.NegObjectReference) []negv1beta1.NegObjectReference {
activeNegs := make(map[negtypes.NegInfo]struct{})
for _, negRef := range currentNegRefs {
negInfo, err := negtypes.NegInfoFromNegRef(negRef)
if err != nil {
continue
}
activeNegs[negInfo] = struct{}{}
}

var inactiveNegRefs []negv1beta1.NegObjectReference
for _, origNegRef := range oldNegRefs {
// NEGs are listed based on the current node zones. If a NEG no longer
// exists in the current list, it means there are no nodes/endpoints
// in that specific zone, and we mark it as INACTIVE.
// We use SelfLink as identifier since it contains the unique NEG zone
// and name pair.
negInfo, err := negtypes.NegInfoFromNegRef(origNegRef)
if err != nil {
continue
}

if _, exists := activeNegs[negInfo]; !exists {
inactiveNegRef := origNegRef.DeepCopy()
inactiveNegRef.State = negv1beta1.InactiveState
inactiveNegRefs = append(inactiveNegRefs, *inactiveNegRef)
}
}
return inactiveNegRefs
}

// getSyncedCondition returns the expected synced condition based on given error
func getSyncedCondition(err error) negv1beta1.Condition {
if err != nil {
Expand Down
193 changes: 177 additions & 16 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,7 @@ func TestTransactionSyncerWithNegCR(t *testing.T) {
expectZones := sets.NewString(negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone4)

var expectedNegRefs map[string]negv1beta1.NegObjectReference
var err error
if tc.negExists {
for zone := range expectZones {
fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{
Expand All @@ -1190,8 +1191,10 @@ func TestTransactionSyncerWithNegCR(t *testing.T) {
Description: tc.negDesc,
}, zone, klog.TODO())
}
ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion(), klog.TODO())
expectedNegRefs = negObjectReferences(ret)
expectedNegRefs, err = negObjectReferences(fakeCloud, negv1beta1.ActiveState, expectZones)
if err != nil {
t.Errorf("Failed to get negObjRef from NEG CR: %v", err)
}
}
var refs []negv1beta1.NegObjectReference
if tc.crStatusPopulated {
Expand Down Expand Up @@ -1222,16 +1225,19 @@ func TestTransactionSyncerWithNegCR(t *testing.T) {
if err != nil {
t.Errorf("Failed to get NEG from neg client: %s", err)
}
ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion(), klog.TODO())
if !tc.expectErr {
expectedNegRefs = negObjectReferences(ret)
expectedNegRefs, err = negObjectReferences(fakeCloud, negv1beta1.ActiveState, expectZones)
if err != nil {
t.Errorf("Failed to get negObjRef from NEG CR: %v", err)
}
}
// if error occurs, expect that neg object references are not populated
if tc.expectErr && !tc.crStatusPopulated {
expectedNegRefs = nil
}

checkNegCR(t, negCR, creationTS, expectZones, expectedNegRefs, false, tc.expectErr)
expectPopulated := !tc.expectErr || tc.crStatusPopulated
checkNegCR(t, negCR, creationTS, expectZones, nil, expectPopulated, false, fakeCloud)
if tc.expectErr && tc.expectNoopOnNegStatus {
// If CR is populated, we should have initialized and synced condition
var expectedConditionLen int
Expand Down Expand Up @@ -1264,6 +1270,7 @@ func TestTransactionSyncerWithNegCR(t *testing.T) {
// Verify the NEGs are created as expected
retZones := sets.NewString()

ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion(), klog.TODO())
for key, neg := range ret {
retZones.Insert(key.Zone)
if neg.Name != testNegName {
Expand All @@ -1288,6 +1295,127 @@ func TestTransactionSyncerWithNegCR(t *testing.T) {
}
}

func TestUpdateInitStatus(t *testing.T) {
t.Parallel()
testNetwork := cloud.ResourcePath("network", &meta.Key{Name: "test-network"})
testSubnetwork := cloud.ResourcePath("subnetwork", &meta.Key{Name: "test-subnetwork"})
testNegType := negtypes.VmIpPortEndpointType

// Active zones: zone1, zone2.
// Inactive zones: zone3
initialActiveZones := sets.NewString(negtypes.TestZone1, negtypes.TestZone2)
initialInactiveZones := sets.NewString(negtypes.TestZone3)

testCases := []struct {
desc string
updatedActiveZones sets.String
updatedInactiveZones sets.String
}{
{
desc: "Add a new zone zone4, an additional NEG ref should be added to NEG CR with ACTIVE status",
updatedActiveZones: sets.NewString(negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone4),
updatedInactiveZones: sets.NewString(negtypes.TestZone3),
},
{
desc: "Removed an ACTIVE zone zone3, corresponding NEG ref should still in NEG CR but with INACTIVE status",
updatedActiveZones: sets.NewString(negtypes.TestZone1),
updatedInactiveZones: sets.NewString(negtypes.TestZone2, negtypes.TestZone3),
},
{
desc: "Add back an INACTIVE zone zone3, the NEG ref in this zone should become ACTIVE in NEG CR",
updatedActiveZones: sets.NewString(negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone3),
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()

fakeCloud := negtypes.NewFakeNetworkEndpointGroupCloud(testSubnetwork, testNetwork)
_, syncer := newTestTransactionSyncer(fakeCloud, testNegType, false)
svcNegClient := syncer.svcNegClient

// Create initial NEGs, and get their Object Ref to be used in NEG CR.
var initialNegRefs []negv1beta1.NegObjectReference
for zone := range initialActiveZones.Union(initialInactiveZones) {
err := fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{
Version: syncer.NegSyncerKey.GetAPIVersion(),
Name: testNegName,
NetworkEndpointType: string(syncer.NegSyncerKey.NegType),
Network: fakeCloud.NetworkURL(),
Subnetwork: fakeCloud.SubnetworkURL(),
}, zone, klog.TODO())
if err != nil {
t.Fatalf("Failed to create NEG %s in zone %s: %v", testNegName, zone, err)
}
neg, err := fakeCloud.GetNetworkEndpointGroup(testNegName, zone, meta.VersionGA, klog.TODO())
if err != nil {
t.Fatalf("Failed to get NEG %s in zone %s: %v", testNegName, zone, err)
}
negRef := negv1beta1.NegObjectReference{
Id: fmt.Sprint(neg.Id),
SelfLink: neg.SelfLink,
NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType),
State: negv1beta1.ActiveState,
SubnetURL: neg.Subnetwork,
}
if initialInactiveZones.Has(zone) {
negRef.State = negv1beta1.InactiveState
}
initialNegRefs = append(initialNegRefs, negRef)
}

// Create NEG CR.
creationTS := v1.Now()
origCR := createNegCR(testNegName, creationTS, true, true, initialNegRefs)
svcNeg, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Create(context2.Background(), origCR, v1.CreateOptions{})
if err != nil {
t.Errorf("Failed to create test NEG CR: %s", err)
}
syncer.svcNegLister.Add(svcNeg)

// Create a NEG in a new zone if zone expanded.
addedZones := tc.updatedActiveZones.Difference(initialActiveZones.Union(initialInactiveZones))
if addedZones != nil {
for zone := range addedZones {
err := fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{
Version: syncer.NegSyncerKey.GetAPIVersion(),
Name: testNegName,
NetworkEndpointType: string(syncer.NegSyncerKey.NegType),
Network: fakeCloud.NetworkURL(),
Subnetwork: fakeCloud.SubnetworkURL(),
}, zone, klog.TODO())
if err != nil {
t.Fatalf("Failed to create NEG %s in zone %s: %v", testNegName, zone, err)
}
}
}

// This is the input list to updateInitStatus().
// It should only include NEG ref in the active zones.
var activeNegList []negv1beta1.NegObjectReference
for zone := range tc.updatedActiveZones {
neg, err := fakeCloud.GetNetworkEndpointGroup(testNegName, zone, meta.VersionGA, klog.TODO())
if err != nil {
t.Fatalf("Failed to get NEG %s in zone %s: %v", testNegName, zone, err)
}
negRef := getNegObjectReference(neg, negv1beta1.ActiveState)
activeNegList = append(activeNegList, negRef)
}

// Inactive NEG refs should be added if there is any.
syncer.updateInitStatus(activeNegList, nil)

negCR, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Get(context2.Background(), testNegName, v1.GetOptions{})
if err != nil {
t.Errorf("Failed to create test NEG CR: %s", err)
}

checkNegCR(t, negCR, creationTS, tc.updatedActiveZones, tc.updatedInactiveZones, true, false, fakeCloud)
})
}
}

func TestUpdateStatus(t *testing.T) {
testNetwork := cloud.ResourcePath("network", &meta.Key{Name: "test-network"})
testSubnetwork := cloud.ResourcePath("subnetwork", &meta.Key{Name: "test-subnetwork"})
Expand Down Expand Up @@ -1481,8 +1609,11 @@ func TestIsZoneChange(t *testing.T) {
Subnetwork: fakeCloud.SubnetworkURL(),
}, zone, klog.TODO())
}
ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion(), klog.TODO())
negRefMap := negObjectReferences(ret)
negRefMap, err := negObjectReferences(fakeCloud, negv1beta1.ActiveState, sets.NewString(origZones...))
if err != nil {
t.Errorf("Failed to get negObjRef from NEG CR: %v", err)
}

var refs []negv1beta1.NegObjectReference
for _, neg := range negRefMap {
refs = append(refs, neg)
Expand Down Expand Up @@ -2453,17 +2584,28 @@ func waitForTransactions(syncer *transactionSyncer) error {
}

// negObjectReferences returns objectReferences for NEG CRs from NEG Objects
func negObjectReferences(negs map[*meta.Key]*composite.NetworkEndpointGroup) map[string]negv1beta1.NegObjectReference {

func negObjectReferences(cloud negtypes.NetworkEndpointGroupCloud, state negv1beta1.NegState, zones sets.String) (map[string]negv1beta1.NegObjectReference, error) {
negObjs := make(map[string]negv1beta1.NegObjectReference)
for _, neg := range negs {
negObjs[neg.SelfLink] = negv1beta1.NegObjectReference{
Id: fmt.Sprint(neg.Id),
SelfLink: neg.SelfLink,
NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType),
for zone := range zones {
neg, err := cloud.GetNetworkEndpointGroup(testNegName, zone, meta.VersionGA, klog.TODO())
if err != nil {
return nil, err
}
negRef := getNegObjectReference(neg, state)
negObjs[neg.SelfLink] = negRef
}
return negObjs, nil
}

// getNegObjectReference returns objectReference for NEG CRs from NEG Object
func getNegObjectReference(neg *composite.NetworkEndpointGroup, negState negv1beta1.NegState) negv1beta1.NegObjectReference {
return negv1beta1.NegObjectReference{
Id: fmt.Sprint(neg.Id),
SelfLink: neg.SelfLink,
NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType),
State: negState,
SubnetURL: neg.Subnetwork,
}
return negObjs
}

// checks the NEG Description on the cloud NEG Object and verifies with expected
Expand Down Expand Up @@ -2562,13 +2704,32 @@ func createNegCR(testNegName string, creationTS metav1.Time, populateInitialized
}

// checkNegCR validates the NegObjectReferences and the LastSyncTime. It will not validate the conditions fields but ensures at most 2 conditions exist
func checkNegCR(t *testing.T, negCR *negv1beta1.ServiceNetworkEndpointGroup, previousLastSyncTime metav1.Time, expectZones sets.String, expectedNegRefs map[string]negv1beta1.NegObjectReference, expectSyncTimeUpdate, expectErr bool) {
func checkNegCR(t *testing.T, negCR *negv1beta1.ServiceNetworkEndpointGroup, previousLastSyncTime metav1.Time, activeZones, inactiveZones sets.String, expectNegCrPopulated, expectSyncTimeUpdate bool, cloud negtypes.NetworkEndpointGroupCloud) {
if expectSyncTimeUpdate && !previousLastSyncTime.Before(&negCR.Status.LastSyncTime) {
t.Errorf("Expected Neg CR to have an updated LastSyncTime")
} else if !expectSyncTimeUpdate && !negCR.Status.LastSyncTime.IsZero() && !previousLastSyncTime.Equal(&negCR.Status.LastSyncTime) {
t.Errorf("Expected Neg CR to not have an updated LastSyncTime")
}

expectedNegRefs := make(map[string]negv1beta1.NegObjectReference)

if expectNegCrPopulated {
ret, err := negObjectReferences(cloud, negv1beta1.ActiveState, activeZones)
if err != nil {
t.Fatalf("Failed to get negObjRef: %v", err)
}
for k, v := range ret {
expectedNegRefs[k] = v
}
ret, err = negObjectReferences(cloud, negv1beta1.InactiveState, inactiveZones)
if err != nil {
t.Fatalf("Failed to get negObjRef: %v", err)
}
for k, v := range ret {
expectedNegRefs[k] = v
}
}

var foundNegObjs []string
if len(negCR.Status.NetworkEndpointGroups) != len(expectedNegRefs) {
t.Errorf("Expected Neg CR to have %d corresponding neg object references, but has %d", len(expectedNegRefs), len(negCR.Status.NetworkEndpointGroups))
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService
Id: fmt.Sprint(neg.Id),
SelfLink: neg.SelfLink,
NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType),
State: negv1beta1.ActiveState,
SubnetURL: neg.Subnetwork,
}
return negRef, nil
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/neg/syncers/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1337,11 +1337,16 @@ func TestNegObjectCrd(t *testing.T) {
t.Errorf("Failed to find neg")
}

var expectedNegObj negv1beta1.NegObjectReference
expectedNegObj = negv1beta1.NegObjectReference{
var subnetURL string
if networkEndpointType != negtypes.NonGCPPrivateEndpointType {
subnetURL = testSubnetwork
}
expectedNegObj := negv1beta1.NegObjectReference{
Id: fmt.Sprint(neg.Id),
SelfLink: neg.SelfLink,
NetworkEndpointType: negv1beta1.NetworkEndpointType(networkEndpointType),
State: negv1beta1.ActiveState,
SubnetURL: subnetURL,
}

if negObj != expectedNegObj {
Expand Down
17 changes: 17 additions & 0 deletions pkg/neg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"reflect"
"strconv"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
apiv1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/annotations"
negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils/namer"
Expand Down Expand Up @@ -382,3 +384,18 @@ func NodeFilterForNetworkEndpointType(negType NetworkEndpointType) zonegetter.Fi
}
return zonegetter.CandidateNodesFilter
}

// NegInfo holds the identifying information regarding a NEG.
type NegInfo struct {
Name string
Zone string
}

// NegInfoFromNegRef returns NegInfo by parsing the NEG selflink.
func NegInfoFromNegRef(negRef negv1beta1.NegObjectReference) (NegInfo, error) {
resourceID, err := cloud.ParseResourceURL(negRef.SelfLink)
if err != nil {
return NegInfo{}, fmt.Errorf("failed to parse selflink: %v", err)
}
return NegInfo{Name: resourceID.Key.Name, Zone: resourceID.Key.Zone}, nil
}

0 comments on commit 67b8e91

Please sign in to comment.