Skip to content

Commit

Permalink
No longer get negURL from API if failed when querying svcNeg.
Browse files Browse the repository at this point in the history
* Once multi-subnet cluster is enabled, there is no deterministic way to
  get the NEG names of all NEGs in Ingress controller since we require
  information on the additional subnets. Thus, we should just return
  and stop if we fail to get NEG URLs from SvcNeg.
  • Loading branch information
sawsa307 committed Jul 31, 2024
1 parent 367f98e commit 22d9677
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 162 deletions.
21 changes: 7 additions & 14 deletions pkg/backends/neg_linker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,9 @@ func (nl *negLinker) Link(sp utils.ServicePort, groups []GroupKey) error {
version := befeatures.VersionFromServicePort(&sp)
negName := sp.NEGName()
svcNegKey := fmt.Sprintf("%s/%s", sp.ID.Service.Namespace, negName)
negSelfLinks, ok := getNegUrlsFromSvcneg(svcNegKey, nl.svcNegLister, nl.logger)
if !ok {
for _, group := range groups {
nl.logger.V(4).Info("Falling back to use NEG API to retrieve NEG url for NEG", "negName", negName)
neg, err := nl.negGetter.GetNetworkEndpointGroup(negName, group.Zone, version, nl.logger)
if err != nil {
return err
}
negSelfLinks = append(negSelfLinks, neg.SelfLink)
}
negSelfLinks, err := getNegUrlsFromSvcneg(svcNegKey, nl.svcNegLister, nl.logger)
if err != nil {
return fmt.Errorf("failed to get neg links from svcneg: %v", err)
}

beName := sp.BackendName()
Expand Down Expand Up @@ -237,22 +230,22 @@ func getNegType(sp utils.ServicePort) types.NetworkEndpointType {
}

// getNegUrlsFromSvcneg return NEG urls from svcneg status
func getNegUrlsFromSvcneg(key string, svcNegLister cache.Indexer, logger klog.Logger) ([]string, bool) {
func getNegUrlsFromSvcneg(key string, svcNegLister cache.Indexer, logger klog.Logger) ([]string, error) {
var negUrls []string
obj, exists, err := svcNegLister.GetByKey(key)
if err != nil {
logger.Error(err, "Failed to retrieve svcneg from cache", "svcneg", key)
return nil, false
return nil, err
}
if !exists {
return nil, false
return nil, fmt.Errorf("svcNeg %s does not exist in cache", key)
}
svcneg := obj.(*negv1beta1.ServiceNetworkEndpointGroup)

for _, negRef := range svcneg.Status.NetworkEndpointGroups {
negUrls = append(negUrls, negRef.SelfLink)
}
return negUrls, true
return negUrls, nil
}

// relativeResourceNameWithDefault will attempt to return a RelativeResourceName
Expand Down
211 changes: 114 additions & 97 deletions pkg/backends/neg_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock"
"github.com/google/go-cmp/cmp"
"github.com/kr/pretty"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/cloud-provider-gcp/providers/gce"
Expand All @@ -52,127 +53,143 @@ func newTestNEGLinker(fakeNEG negtypes.NetworkEndpointGroupCloud, fakeGCE *gce.C
}

func TestLinkBackendServiceToNEG(t *testing.T) {
for _, tc := range []struct {
name string
populateSvcNeg bool
t.Parallel()

fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
fakeNEG := negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
linker := newTestNEGLinker(fakeNEG, fakeGCE)

zones := []GroupKey{{Zone: "zone1"}, {Zone: "zone2"}}
namespace, svcName, port := "ns", "name", "port"
svc := types.NamespacedName{Namespace: namespace, Name: svcName}

// validate different service port for both L4 ILB and L7 LBs
testCases := []struct {
desc string
svcPort utils.ServicePort
}{
{
name: "Get NEG URL via API",
populateSvcNeg: false,
desc: "Link L4 NEGs",
svcPort: utils.ServicePort{
ID: utils.ServicePortID{Service: svc},
BackendNamer: defaultL4Namer,
VMIPNEGEnabled: true,
},
},
{
name: "Get NEG URL via SvcNeg",
populateSvcNeg: true,
desc: "Link Ingress NEGs",
svcPort: utils.ServicePort{
ID: utils.ServicePortID{Service: svc},
Port: 80,
NodePort: 30001,
Protocol: annotations.ProtocolHTTP,
TargetPort: intstr.FromString(port),
NEGEnabled: true,
BackendNamer: defaultNamer,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
fakeNEG := negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
linker := newTestNEGLinker(fakeNEG, fakeGCE)

zones := []GroupKey{{Zone: "zone1"}, {Zone: "zone2"}}
namespace, name, port := "ns", "name", "port"
svc := types.NamespacedName{Namespace: namespace, Name: name}
{
desc: "Link RXLB Ingress",
svcPort: utils.ServicePort{
ID: utils.ServicePortID{Service: svc},
Port: 80,
NodePort: 30001,
Protocol: annotations.ProtocolHTTP,
TargetPort: intstr.FromString(port),
NEGEnabled: true,
L7XLBRegionalEnabled: true,
BackendNamer: defaultNamer,
},
},
}

// validate different service port for both L4 ILB and L7 LBs
for _, svcPort := range []utils.ServicePort{
{
ID: utils.ServicePortID{Service: svc},
BackendNamer: defaultNamer,
VMIPNEGEnabled: true},
{
ID: utils.ServicePortID{Service: svc},
Port: 80,
NodePort: 30001,
Protocol: annotations.ProtocolHTTP,
TargetPort: intstr.FromString(port),
NEGEnabled: true,
BackendNamer: defaultNamer},
{
ID: utils.ServicePortID{Service: svc},
Port: 80,
NodePort: 30001,
Protocol: annotations.ProtocolHTTP,
TargetPort: intstr.FromString(port),
NEGEnabled: true,
L7XLBRegionalEnabled: true,
BackendNamer: defaultNamer},
} {
// Mimic how the syncer would create the backend.
if _, err := linker.backendPool.Create(svcPort, "fake-healthcheck-link", klog.TODO()); err != nil {
t.Fatalf("Failed to create backend service to NEG for svcPort %v: %v", svcPort, err)
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
// Mimic how the syncer would create the backend.
if _, err := linker.backendPool.Create(tc.svcPort, "fake-healthcheck-link", klog.TODO()); err != nil {
t.Fatalf("Failed to create backend service to NEG for svcPort %v: %v", tc.svcPort, err)
}

version := befeatures.VersionFromServicePort(&svcPort)
version := befeatures.VersionFromServicePort(&tc.svcPort)

if tc.populateSvcNeg {
linker.svcNegLister.Add(v1beta1.ServiceNetworkEndpointGroup{Status: v1beta1.ServiceNetworkEndpointGroupStatus{
err := linker.svcNegLister.Add(
&v1beta1.ServiceNetworkEndpointGroup{
TypeMeta: meta_v1.TypeMeta{
Kind: "ServiceNetworkEndpointGroup",
APIVersion: "networking.gke.io/v1beta1",
},
ObjectMeta: meta_v1.ObjectMeta{
Name: tc.svcPort.NEGName(),
Namespace: namespace,
},
Status: v1beta1.ServiceNetworkEndpointGroupStatus{
NetworkEndpointGroups: []v1beta1.NegObjectReference{
{SelfLink: fmt.Sprintf("https://www.googleapis.com/compute/alpha/projects/mock-project/zones/zone1/networkEndpointGroups/%s", svcPort.NEGName())},
{SelfLink: fmt.Sprintf("https://www.googleapis.com/compute/alpha/projects/mock-project/zones/zone2/networkEndpointGroups/%s", svcPort.NEGName())},
{SelfLink: fmt.Sprintf("https://www.googleapis.com/compute/alpha/projects/mock-project/zones/zone1/networkEndpointGroups/%s", tc.svcPort.NEGName())},
{SelfLink: fmt.Sprintf("https://www.googleapis.com/compute/alpha/projects/mock-project/zones/zone2/networkEndpointGroups/%s", tc.svcPort.NEGName())},
},
}})
if err != nil {
t.Fatalf("Failed to add svcneg: %v", err)
}

for _, key := range zones {
neg := &composite.NetworkEndpointGroup{
Name: tc.svcPort.NEGName(),
Version: version,
}
for _, key := range zones {
neg := &composite.NetworkEndpointGroup{
Name: svcPort.NEGName(),
Version: version,
}
if svcPort.VMIPNEGEnabled {
neg.NetworkEndpointType = string(negtypes.VmIpEndpointType)
}
err := fakeNEG.CreateNetworkEndpointGroup(neg, key.Zone, klog.TODO())
if err != nil {
t.Fatalf("unexpected error creating NEG for svcPort %v: %v", svcPort, err)
}
if tc.svcPort.VMIPNEGEnabled {
neg.NetworkEndpointType = string(negtypes.VmIpEndpointType)
}
err := fakeNEG.CreateNetworkEndpointGroup(neg, key.Zone, klog.TODO())
if err != nil {
t.Fatalf("unexpected error creating NEG for svcPort %v: %v", tc.svcPort, err)
}
}

if err := linker.Link(svcPort, zones); err != nil {
t.Fatalf("Failed to link backend service to NEG for svcPort %v: %v", svcPort, err)
if err := linker.Link(tc.svcPort, zones); err != nil {
t.Fatalf("Failed to link backend service to NEG for svcPort %v: %v", tc.svcPort, err)
}

// validate function validates if the state is expected
validate := func() {
beName := tc.svcPort.BackendName()
scope := befeatures.ScopeFromServicePort(&tc.svcPort)
key, err := composite.CreateKey(fakeGCE, beName, scope)
if err != nil {
t.Fatalf("Failed to create composite key - %v", err)
}
bs, err := composite.GetBackendService(fakeGCE, key, version, klog.TODO())
if err != nil {
t.Fatalf("Failed to retrieve backend service using key %+v for svcPort %v: %v", key, tc.svcPort, err)
}
if len(bs.Backends) != len(zones) {
t.Errorf("Expect %v backends in backend service %s, but got %v.key %+v %+v", len(zones), beName, len(bs.Backends), key, bs)
}

// validate function validates if the state is expected
validate := func() {
beName := svcPort.BackendName()
scope := befeatures.ScopeFromServicePort(&svcPort)
key, err := composite.CreateKey(fakeGCE, beName, scope)
if err != nil {
t.Fatalf("Failed to create composite key - %v", err)
}
bs, err := composite.GetBackendService(fakeGCE, key, version, klog.TODO())
if err != nil {
t.Fatalf("Failed to retrieve backend service using key %+v for svcPort %v: %v", key, svcPort, err)
}
if len(bs.Backends) != len(zones) {
t.Errorf("Expect %v backends in backend service %s, but got %v.key %+v %+v", len(zones), beName, len(bs.Backends), key, bs)
for _, be := range bs.Backends {
neg := "networkEndpointGroups"
if !strings.Contains(be.Group, neg) {
t.Errorf("Got backend link %q, want containing %q", be.Group, neg)
}

for _, be := range bs.Backends {
neg := "networkEndpointGroups"
if !strings.Contains(be.Group, neg) {
t.Errorf("Got backend link %q, want containing %q", be.Group, neg)
}
if svcPort.VMIPNEGEnabled {
// Balancing mode should be connection, rate should be unset
if be.BalancingMode != string(Connections) || be.MaxRatePerEndpoint != 0 {
t.Errorf("Only 'Connection' balancing mode is supported with VM_IP NEGs, Got %q with max rate %v", be.BalancingMode, be.MaxRatePerEndpoint)
}
if tc.svcPort.VMIPNEGEnabled {
// Balancing mode should be connection, rate should be unset
if be.BalancingMode != string(Connections) || be.MaxRatePerEndpoint != 0 {
t.Errorf("Only 'Connection' balancing mode is supported with VM_IP NEGs, Got %q with max rate %v", be.BalancingMode, be.MaxRatePerEndpoint)
}
}
}
}

validate()

// mimic cluster node shrinks to one of the zone
shrinkZone := []GroupKey{zones[0]}
if err := linker.Link(svcPort, shrinkZone); err != nil {
t.Fatalf("Failed to link backend service to NEG for svcPort %v: %v", svcPort, err)
}
validate()

validate()
// mimic cluster node shrinks to one of the zone
shrinkZone := []GroupKey{zones[0]}
if err := linker.Link(tc.svcPort, shrinkZone); err != nil {
t.Fatalf("Failed to link backend service to NEG for svcPort %v: %v", tc.svcPort, err)
}
})

validate()
})
}

}
Expand Down
1 change: 1 addition & 0 deletions pkg/backends/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (p *portset) check(fakeGCE *gce.Cloud) error {

var (
defaultNamer = namer.NewNamer("uid1", "fw1", klog.TODO())
defaultL4Namer = namer.NewL4Namer("uid1", nil)
defaultBackendSvc = types.NamespacedName{Namespace: "system", Name: "default"}
existingProbe = &api_v1.Probe{
ProbeHandler: api_v1.ProbeHandler{
Expand Down
Loading

0 comments on commit 22d9677

Please sign in to comment.