Skip to content

Commit

Permalink
Update GlobalIngressIP internal service on exported service change
Browse files Browse the repository at this point in the history
...specifically the ports.

Fixes #1738

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis authored and aswinsuryan committed Dec 11, 2023
1 parent 5bfd32d commit 793f75e
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 93 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/projectcalico/api v0.0.0-20230602153125-fb7148692637
github.com/prometheus-community/pro-bing v0.3.0
github.com/prometheus/client_golang v1.17.0
github.com/submariner-io/admiral v0.17.0-m1
github.com/submariner-io/admiral v0.17.0-m1.0.20231208185836-b2ed2d19de6a
github.com/submariner-io/shipyard v0.17.0-m1
github.com/uw-labs/lichen v0.1.7
github.com/vishvananda/netlink v1.2.1-beta.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/submariner-io/admiral v0.17.0-m1 h1:P562LyX6p7JtZRzN6qIpn/PDYVm6BFk9Y0UsWF4jgrY=
github.com/submariner-io/admiral v0.17.0-m1/go.mod h1:QiImiDvL+FFRlAIsxLCZsLxe/NjuCl1ilGYT8Y4MtWU=
github.com/submariner-io/admiral v0.17.0-m1.0.20231208185836-b2ed2d19de6a h1:jXRguxmAjGNkGVAWpxy5+LBejWID1PlGBuIOK+GcypI=
github.com/submariner-io/admiral v0.17.0-m1.0.20231208185836-b2ed2d19de6a/go.mod h1:QiImiDvL+FFRlAIsxLCZsLxe/NjuCl1ilGYT8Y4MtWU=
github.com/submariner-io/shipyard v0.17.0-m1 h1:Hjm8uI4ZrhvnFqEDPxVhtOl9ViNUymIzbOi8M1SWzCA=
github.com/submariner-io/shipyard v0.17.0-m1/go.mod h1:BGeUHK0xxdKtOJLkkCU4LZpA2D9gLTINHZ7xThuTmcE=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down
17 changes: 0 additions & 17 deletions pkg/globalnet/controllers/base_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/federate"
resourceUtil "github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/util"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
iptiface "github.com/submariner-io/submariner/pkg/globalnet/controllers/iptables"
Expand Down Expand Up @@ -235,22 +234,6 @@ func getService(name, namespace string,
return service, true, nil
}

func createService(svc *corev1.Service,
client dynamic.NamespaceableResourceInterface,
) (*unstructured.Unstructured, error) {
gnService, err := resourceUtil.ToUnstructured(svc)
if err != nil {
return nil, err //nolint:wrapcheck // Let the caller wrap it
}

obj, err := client.Namespace(gnService.GetNamespace()).Create(context.TODO(), gnService, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
err = nil
}

return obj, errors.Wrapf(err, "error creating the internal Service %s/%s", svc.Namespace, svc.Name)
}

func deleteService(namespace, name string,
client dynamic.NamespaceableResourceInterface,
) error {
Expand Down
23 changes: 4 additions & 19 deletions pkg/globalnet/controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,34 +359,19 @@ func (t *testDriverBase) awaitIngressIPStatusAllocated(name string) {
}

func (t *testDriverBase) awaitGlobalIngressIP(name string) *submarinerv1.GlobalIngressIP {
obj := test.AwaitResource(t.globalIngressIPs, name)

gip := &submarinerv1.GlobalIngressIP{}
Expect(runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, gip)).To(Succeed())

return gip
return resource.MustFromUnstructured(test.AwaitResource(t.globalIngressIPs, name), &submarinerv1.GlobalIngressIP{})
}

func (t *testDriverBase) awaitService(name string) *corev1.Service {
obj := test.AwaitResource(t.services, name)

svc := &corev1.Service{}
Expect(runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, svc)).To(Succeed())

return svc
return resource.MustFromUnstructured(test.AwaitResource(t.services, name), &corev1.Service{})
}

func (t *testDriverBase) awaitNoService(name string) {
test.AwaitNoResource(t.services, name)
}

func (t *testDriverBase) awaitEndpoints(name string) *corev1.Endpoints {
obj := test.AwaitResource(t.endpoints, name)

ep := &corev1.Endpoints{}
Expect(runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, ep)).To(Succeed())

return ep
return resource.MustFromUnstructured(test.AwaitResource(t.endpoints, name), &corev1.Endpoints{})
}

func (t *testDriverBase) awaitNoEndpoints(name string) {
Expand Down Expand Up @@ -632,7 +617,7 @@ func newClusterIPService() *corev1.Service {
Ports: []corev1.ServicePort{{
Name: serviceName,
Port: int32(8080),
TargetPort: intstr.FromInt(8080),
TargetPort: intstr.FromInt32(8080),
Protocol: corev1.ProtocolTCP,
}},
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/globalnet/controllers/gateway_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,12 @@ func (g *gatewayMonitor) startControllers() error {

// The GlobalIngressIP controller needs to be started before the ServiceExport and Service controllers to ensure
// reconciliation works properly.
c, err = NewGlobalIngressIPController(g.syncerConfig, pool)
gipController, err := NewGlobalIngressIPController(g.syncerConfig, pool)
if err != nil {
return errors.Wrap(err, "error creating the GlobalIngressIP controller")
}

g.controllers = append(g.controllers, c)
g.controllers = append(g.controllers, gipController)

podControllers, err := NewIngressPodControllers(g.syncerConfig)
if err != nil {
Expand All @@ -393,7 +393,7 @@ func (g *gatewayMonitor) startControllers() error {

g.controllers = append(g.controllers, seController)

c, err = NewServiceController(g.syncerConfig, podControllers, seController.GetSyncer())
c, err = NewServiceController(g.syncerConfig, podControllers, seController.GetSyncer(), gipController.GetSyncer())
if err != nil {
return errors.Wrap(err, "error creating the Service controller")
}
Expand Down
91 changes: 67 additions & 24 deletions pkg/globalnet/controllers/global_ingressip_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
)

func NewGlobalIngressIPController(config *syncer.ResourceSyncerConfig, pool *ipam.IPPool) (Interface, error) {
//nolint:revive // Ignore "unexported-return:... which can be annoying to use"; it's only used by unit tests.
func NewGlobalIngressIPController(config *syncer.ResourceSyncerConfig, pool *ipam.IPPool) (*globalIngressIPController, error) {
// We'll panic if config is nil, this is intentional
var err error

Expand Down Expand Up @@ -132,6 +134,10 @@ func NewGlobalIngressIPController(config *syncer.ResourceSyncerConfig, pool *ipa
return controller, nil
}

func (c *globalIngressIPController) GetSyncer() syncer.Interface {
return c.resourceSyncer
}

func (c *globalIngressIPController) process(from runtime.Object, numRequeues int, op syncer.Operation) (runtime.Object, bool) {
ingressIP := from.(*submarinerv1.GlobalIngressIP)

Expand All @@ -156,9 +162,10 @@ func (c *globalIngressIPController) process(from runtime.Object, numRequeues int
}

func (c *globalIngressIPController) onCreate(ingressIP *submarinerv1.GlobalIngressIP) bool {
// If Ingress GlobalIP is already allocated, simply return.
// If the Ingress GlobalIP is already allocated, we may have gotten here due to an underlying service update (eg ports changed) in
// which case we need to update the internal service for non-headless.
if ingressIP.Status.AllocatedIP != "" {
return false
return c.onUpdate(ingressIP)
}

key, _ := cache.MetaNamespaceKeyFunc(ingressIP)
Expand Down Expand Up @@ -196,29 +203,11 @@ func (c *globalIngressIPController) onCreate(ingressIP *submarinerv1.GlobalIngre
return false
}

internalService := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: GetInternalSvcName(service.Name),
Namespace: service.Namespace,
Labels: map[string]string{
InternalServiceLabel: service.Name,
},
Finalizers: []string{InternalServiceFinalizer},
},
}

extIPs := []string{ips[0]}
internalService.Spec.Ports = service.Spec.Ports
internalService.Spec.Selector = service.Spec.Selector
ipFamilySingleStack := corev1.IPFamilyPolicySingleStack
internalService.Spec.IPFamilyPolicy = &ipFamilySingleStack
internalService.Spec.ExternalIPs = extIPs

_, err = createService(internalService, c.services)
err = c.createOrUpdateInternalService(service, ips[0])
if err != nil {
_ = c.pool.Release(ips...)
key := fmt.Sprintf("%s/%s", internalService.Namespace, internalService.Name)
logger.Errorf(err, "Failed to create the internal Service %q ", key)

logger.Errorf(err, "Failed to create the internal Service for %q", key)

meta.SetStatusCondition(&ingressIP.Status.Conditions, metav1.Condition{
Type: string(submarinerv1.GlobalEgressIPAllocated),
Expand Down Expand Up @@ -289,6 +278,60 @@ func (c *globalIngressIPController) onCreate(ingressIP *submarinerv1.GlobalIngre
return false
}

func (c *globalIngressIPController) createOrUpdateInternalService(from *corev1.Service, extIP string) error {
internalService := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: GetInternalSvcName(from.Name),
Labels: map[string]string{
InternalServiceLabel: from.Name,
},
Finalizers: []string{InternalServiceFinalizer},
},
Spec: corev1.ServiceSpec{
Ports: from.Spec.Ports,
Selector: from.Spec.Selector,
ExternalIPs: []string{extIP},
IPFamilyPolicy: ptr.To(corev1.IPFamilyPolicySingleStack),
},
}

obj := resource.MustToUnstructured(internalService)
result, err := util.CreateOrUpdate(context.TODO(), resource.ForDynamic(c.services.Namespace(from.Namespace)), obj, util.Replace(obj))

if result == util.OperationResultCreated {
logger.Infof("Created internal service \"%s/%s\"", from.Namespace, internalService.Name)
} else if result == util.OperationResultUpdated {
logger.Infof("Updated internal service \"%s/%s\"", from.Namespace, internalService.Name)
}

return err //nolint:wrapcheck // No need to wrap here
}

func (c *globalIngressIPController) onUpdate(ingressIP *submarinerv1.GlobalIngressIP) bool {
if ingressIP.Spec.Target != submarinerv1.ClusterIPService {
return false
}

service, exists, err := getService(ingressIP.Spec.ServiceRef.Name, ingressIP.Namespace, c.services, c.scheme)
if !exists {
return false
}

if err != nil {
logger.Errorf(err, "Error retrieving exported Service \"%s/%s\" - re-queueing", ingressIP.Namespace,
ingressIP.Spec.ServiceRef.Name)
return true
}

err = c.createOrUpdateInternalService(service, ingressIP.Status.AllocatedIP)
if err != nil {
logger.Errorf(err, "Failed to update the internal Service for \"%s/%s\"", ingressIP.Namespace, ingressIP.Name)
return true
}

return false
}

//nolint:wrapcheck // No need to wrap these errors.
func (c *globalIngressIPController) onDelete(ingressIP *submarinerv1.GlobalIngressIP, numRequeues int) bool {
if ingressIP.Status.AllocatedIP == "" {
Expand Down
22 changes: 13 additions & 9 deletions pkg/globalnet/controllers/global_ingressip_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,23 +128,26 @@ var _ = Describe("GlobalIngressIP controller", func() {
})

func testGlobalIngressIPCreatedClusterIPSvc(t *globalIngressIPControllerTestDriver, ingressIP *submarinerv1.GlobalIngressIP) {
var service *corev1.Service

JustBeforeEach(func() {
service := newClusterIPService()
service = newClusterIPService()
t.createService(service)
t.createGlobalIngressIP(ingressIP)
})

It("should successfully allocate a global IP", func() {
It("should create an internal submariner service with an allocated global IP", func() {
t.awaitIngressIPStatusAllocated(globalIngressIPName)
allocatedIP := t.getGlobalIngressIPStatus(globalIngressIPName).AllocatedIP
Expect(allocatedIP).ToNot(BeEmpty())
})

It("should successfully create an internal submariner service", func() {
internalSvcName := controllers.GetInternalSvcName(serviceName)
intSvc := t.awaitService(internalSvcName)
externalIP := intSvc.Spec.ExternalIPs[0]
Expect(externalIP).ToNot(BeEmpty())
Expect(externalIP).To(Equal(allocatedIP))
Expect(intSvc.Spec.Ports).To(Equal(service.Spec.Ports))
Expect(intSvc.Spec.IPFamilyPolicy).ToNot(BeNil())
Expect(*intSvc.Spec.IPFamilyPolicy).To(Equal(corev1.IPFamilyPolicySingleStack))

finalizer := intSvc.GetFinalizers()[0]
Expect(finalizer).To(Equal(controllers.InternalServiceFinalizer))
Expand Down Expand Up @@ -519,17 +522,18 @@ func newGlobalIngressIPControllerDriver() *globalIngressIPControllerTestDriver {
return t
}

func (t *globalIngressIPControllerTestDriver) start() {
var err error

t.controller, err = controllers.NewGlobalIngressIPController(&syncer.ResourceSyncerConfig{
func (t *globalIngressIPControllerTestDriver) start() syncer.Interface {
controller, err := controllers.NewGlobalIngressIPController(&syncer.ResourceSyncerConfig{
SourceClient: t.dynClient,
RestMapper: t.restMapper,
Scheme: t.scheme,
}, t.pool)
t.controller = controller

Expect(err).To(Succeed())
Expect(t.controller.Start()).To(Succeed())

return controller.GetSyncer()
}

func (t *globalIngressIPControllerTestDriver) awaitPodEgressRules(podIP, snatIP string) {
Expand Down
15 changes: 10 additions & 5 deletions pkg/globalnet/controllers/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
)

func NewServiceController(config *syncer.ResourceSyncerConfig, podControllers *IngressPodControllers, serviceExportSyncer syncer.Interface,
gipSyncer syncer.Interface,
) (Interface, error) {
// We'll panic if config is nil, this is intentional
var err error
Expand All @@ -42,6 +43,7 @@ func NewServiceController(config *syncer.ResourceSyncerConfig, podControllers *I
baseSyncerController: newBaseSyncerController(),
podControllers: podControllers,
serviceExportSyncer: serviceExportSyncer,
gipSyncer: gipSyncer,
}

controller.resourceSyncer, err = syncer.NewResourceSyncer(&syncer.ResourceSyncerConfig{
Expand Down Expand Up @@ -99,20 +101,23 @@ func (c *serviceController) Start() error {
func (c *serviceController) process(from runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) {
service := from.(*corev1.Service)

if service.Spec.Type != corev1.ServiceTypeClusterIP || op == syncer.Update {
if service.Spec.Type != corev1.ServiceTypeClusterIP {
return nil, false
}

key, _ := cache.MetaNamespaceKeyFunc(service)
logger.Infof("Service %q %sd", key, op)

if op == syncer.Delete {
switch op {
case syncer.Create:
// For Create, requeue the associated ServiceExport, if any, to re-create the GlobalIngressIP.
c.serviceExportSyncer.RequeueResource(service.Name, service.Namespace)
case syncer.Delete:
return c.onDelete(service)
case syncer.Update:
c.gipSyncer.RequeueResource(service.Name, service.Namespace)
}

// For Create, requeue the associated ServiceExport, if any, to re-create the GlobalIngressIP.
c.serviceExportSyncer.RequeueResource(service.Name, service.Namespace)

return nil, false
}

Expand Down
Loading

0 comments on commit 793f75e

Please sign in to comment.