Skip to content

Commit

Permalink
Add support for multiple gRPC client management (#93)
Browse files Browse the repository at this point in the history
Fixes #46.

---------

Signed-off-by: Kobi Levi <[email protected]>
Co-authored-by: Shane Utt <[email protected]>
  • Loading branch information
levikobi and shaneutt authored Oct 27, 2023
1 parent 4e6ec82 commit 8bb3a4e
Show file tree
Hide file tree
Showing 8 changed files with 471 additions and 93 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.19 as builder
FROM golang:1.21 as builder

LABEL org.opencontainers.image.source=https://github.com/kubernetes-sigs/blixt
LABEL org.opencontainers.image.description="An experimental layer 4 load-balancer built using eBPF/XDP with ebpf-go \
Expand Down
172 changes: 172 additions & 0 deletions controllers/dataplane_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package controllers

import (
"context"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

dataplane "github.com/kubernetes-sigs/blixt/internal/dataplane/client"
"github.com/kubernetes-sigs/blixt/pkg/vars"
)

//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/finalizers,verbs=update

//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services/status,verbs=get

//+kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=endpoints/status,verbs=get

// DataplaneReconciler reconciles the dataplane pods.
type DataplaneReconciler struct {
client.Client
scheme *runtime.Scheme

backendsClientManager *dataplane.BackendsClientManager

updates chan event.GenericEvent
}

func NewDataplaneReconciler(client client.Client, schema *runtime.Scheme, manager *dataplane.BackendsClientManager) *DataplaneReconciler {
return &DataplaneReconciler{
Client: client,
scheme: schema,
backendsClientManager: manager,
updates: make(chan event.GenericEvent, 1),
}
}

var (
podOwnerKey = ".metadata.controller"
apiGVStr = appsv1.SchemeGroupVersion.String()
)

// SetupWithManager loads the controller into the provided controller manager.
func (r *DataplaneReconciler) SetupWithManager(mgr ctrl.Manager) error {

// In order to allow our reconciler to quickly look up Pods by their owner, we’ll
// need an index. We declare an index key that we can later use with the client
// as a pseudo-field name, and then describe how to extract the indexed value from
// the Pod object. The indexer will automatically take care of namespaces for us,
// so we just have to extract the owner name if the Pod has a DaemonSet owner.
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, podOwnerKey, func(rawObj client.Object) []string {
// grab the pod object, extract the owner...
pod := rawObj.(*corev1.Pod)
owner := metav1.GetControllerOf(pod)
if owner == nil {
return nil
}
// ...make sure it's a DaemonSet...
if owner.APIVersion != apiGVStr || owner.Kind != "DaemonSet" {
return nil
}

// ...and if so, return it
return []string{owner.Name}
}); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&appsv1.DaemonSet{},
builder.WithPredicates(predicate.NewPredicateFuncs(r.daemonsetHasMatchingAnnotations)),
).
WithEventFilter(predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
return true
},
}).
Complete(r)
}

func (r *DataplaneReconciler) daemonsetHasMatchingAnnotations(obj client.Object) bool {
log := log.FromContext(context.Background())

daemonset, ok := obj.(*appsv1.DaemonSet)
if !ok {
log.Error(fmt.Errorf("received unexpected type in daemonset watch predicates: %T", obj), "THIS SHOULD NEVER HAPPEN!")
return false
}

// determine if this is a blixt daemonset
matchLabels := daemonset.Spec.Selector.MatchLabels
app, ok := matchLabels["app"]
if !ok || app != vars.DefaultDataPlaneAppLabel {
return false
}

// verify that it's the dataplane daemonset
component, ok := matchLabels["component"]
if !ok || component != vars.DefaultDataPlaneComponentLabel {
return false
}

return true
}

// Reconcile provisions (and de-provisions) resources relevant to this controller.
func (r *DataplaneReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)

ds := new(appsv1.DaemonSet)
if err := r.Client.Get(ctx, req.NamespacedName, ds); err != nil {
if errors.IsNotFound(err) {
logger.Info("DataplaneReconciler", "reconcile status", "object enqueued no longer exists, skipping")
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

var childPods corev1.PodList
if err := r.List(ctx, &childPods, client.InNamespace(req.Namespace), client.MatchingFields{podOwnerKey: req.Name}); err != nil {
logger.Error(err, "DataplaneReconciler", "reconcile status", "unable to list child pods")
return ctrl.Result{}, err
}

readyPodByNN := make(map[types.NamespacedName]corev1.Pod)
for _, pod := range childPods.Items {
for _, container := range pod.Status.ContainerStatuses {
if container.Name == vars.DefaultDataPlaneComponentLabel && container.Ready {
key := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
readyPodByNN[key] = pod
}
}
}

logger.Info("DataplaneReconciler", "reconcile status", "setting updated backends client list", "num ready pods", len(readyPodByNN))
updated, err := r.backendsClientManager.SetClientsList(ctx, readyPodByNN)
if updated {
logger.Info("DataplaneReconciler", "reconcile status", "backends client list updated, sending generic event")
select {
case r.updates <- event.GenericEvent{Object: ds}:
logger.Info("DataplaneReconciler", "reconcile status", "generic event sent")
default:
logger.Info("DataplaneReconciler", "reconcile status", "generic event skipped - channel is full")
}
}
if err != nil {
logger.Error(err, "DataplaneReconciler", "reconcile status", "partial failure for backends client list update")
return ctrl.Result{Requeue: true}, err
}

logger.Info("DataplaneReconciler", "reconcile status", "done")
return ctrl.Result{}, nil
}

func (r *DataplaneReconciler) GetUpdates() <-chan event.GenericEvent {
return r.updates
}
42 changes: 16 additions & 26 deletions controllers/tcproute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import (
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

Expand All @@ -52,7 +53,9 @@ type TCPRouteReconciler struct {
client.Client
Scheme *runtime.Scheme

log logr.Logger
log logr.Logger
ClientReconcileRequestChan <-chan event.GenericEvent
BackendsClientManager *dataplane.BackendsClientManager
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -61,8 +64,8 @@ func (r *TCPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
For(&gatewayv1alpha2.TCPRoute{}).
Watches(
&appsv1.DaemonSet{},
WatchesRawSource(
&source.Channel{Source: r.ClientReconcileRequestChan},
handler.EnqueueRequestsFromMapFunc(r.mapDataPlaneDaemonsetToTCPRoutes),
).
Watches(
Expand Down Expand Up @@ -202,18 +205,11 @@ func (r *TCPRouteReconciler) ensureTCPRouteConfiguredInDataPlane(ctx context.Con
return err
}

// TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46
dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client)
if err != nil {
if _, err = r.BackendsClientManager.Update(ctx, targets); err != nil {
return err
}

confirmation, err := dataplaneClient.Update(context.Background(), targets)
if err != nil {
return err
}

r.log.Info(fmt.Sprintf("successful data-plane UPDATE, confirmation: %s", confirmation.String()))
r.log.Info("successful data-plane UPDATE")

return nil
}
Expand All @@ -230,24 +226,17 @@ func (r *TCPRouteReconciler) ensureTCPRouteDeletedInDataPlane(ctx context.Contex
return err
}

// TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46
dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client)
if err != nil {
return err
}

// since we currently only support one TCPRoute per Gateway, we can delete the vip (gateway)
// entry from the dataplane. this won't fly when we end up adding support for multiple TCPRoutes
// per Gateway.
confirmation, err := dataplaneClient.Delete(context.Background(), &dataplane.Vip{
vip := dataplane.Vip{
Ip: gatewayIP,
Port: gwPort,
})
if err != nil {
}

// delete the target from the dataplane
if _, err = r.BackendsClientManager.Delete(ctx, &vip); err != nil {
return err
}

r.log.Info(fmt.Sprintf("successful data-plane DELETE, confirmation: %s", confirmation.String()))
r.log.Info("successful data-plane DELETE")

oldFinalizers := tcproute.GetFinalizers()
newFinalizers := make([]string, 0, len(oldFinalizers)-1)
Expand All @@ -259,4 +248,5 @@ func (r *TCPRouteReconciler) ensureTCPRouteDeletedInDataPlane(ctx context.Contex
tcproute.SetFinalizers(newFinalizers)

return r.Client.Update(ctx, tcproute)

}
41 changes: 15 additions & 26 deletions controllers/udproute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import (
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

Expand All @@ -52,7 +53,9 @@ type UDPRouteReconciler struct {
client.Client
Scheme *runtime.Scheme

log logr.Logger
log logr.Logger
ClientReconcileRequestChan <-chan event.GenericEvent
BackendsClientManager *dataplane.BackendsClientManager
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -61,8 +64,8 @@ func (r *UDPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
For(&gatewayv1alpha2.UDPRoute{}).
Watches(
&appsv1.DaemonSet{},
WatchesRawSource(
&source.Channel{Source: r.ClientReconcileRequestChan},
handler.EnqueueRequestsFromMapFunc(r.mapDataPlaneDaemonsetToUDPRoutes),
).
Watches(
Expand Down Expand Up @@ -202,18 +205,11 @@ func (r *UDPRouteReconciler) ensureUDPRouteConfiguredInDataPlane(ctx context.Con
return err
}

// TODO: add multiple endpoint support https://github.com/kubernetes-sigs/blixt/issues/46
dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client)
if err != nil {
if _, err = r.BackendsClientManager.Update(ctx, targets); err != nil {
return err
}

confirmation, err := dataplaneClient.Update(context.Background(), targets)
if err != nil {
return err
}

r.log.Info(fmt.Sprintf("successful data-plane UPDATE, confirmation: %s", confirmation.String()))
r.log.Info("successful data-plane UPDATE")

return nil
}
Expand All @@ -230,24 +226,17 @@ func (r *UDPRouteReconciler) ensureUDPRouteDeletedInDataPlane(ctx context.Contex
return err
}

// TODO: add multiple endpoint support https://github.com/kubernetes-sigs/blixt/issues/46
dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client)
if err != nil {
return err
}

// since we currently only support one UDPRoute per Gateway, we can delete the vip (gateway)
// entry from the dataplane. this won't fly when we end up adding support for multiple UDPRoutes
// per Gateway.
confirmation, err := dataplaneClient.Delete(context.Background(), &dataplane.Vip{
vip := dataplane.Vip{
Ip: gatewayIP,
Port: gwPort,
})
if err != nil {
}

// delete the target from the dataplane
if _, err = r.BackendsClientManager.Delete(ctx, &vip); err != nil {
return err
}

r.log.Info(fmt.Sprintf("successful data-plane DELETE, confirmation: %s", confirmation.String()))
r.log.Info("successful data-plane DELETE")

oldFinalizers := udproute.GetFinalizers()
newFinalizers := make([]string, 0, len(oldFinalizers)-1)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/go-logr/logr v1.2.4
github.com/go-logr/stdr v1.2.2
github.com/google/uuid v1.3.1
github.com/kong/blixt v0.2.1
github.com/kong/kubernetes-testing-framework v0.39.1
github.com/onsi/ginkgo/v2 v2.13.0
github.com/onsi/gomega v1.28.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kong/blixt v0.2.1 h1:QMBmHziwPhxgbUDl+Be+5cbQ6tbOT9JqGJJF8GtZs0U=
github.com/kong/blixt v0.2.1/go.mod h1:N6gFG9pVhbrva7dzadaWJycGDkG6EBFgybE1hJQoIR0=
github.com/kong/kubernetes-testing-framework v0.39.1 h1:30dTVe0Muda3r6NAMQHvdGLuB+nkhZRXnJA8AJjuvO4=
github.com/kong/kubernetes-testing-framework v0.39.1/go.mod h1:12TQ5gAkZhuxh47IJcW03iumky1X/T7ZCStuClQ1vzs=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand Down
Loading

0 comments on commit 8bb3a4e

Please sign in to comment.