From 45a887e30cfd026cbb27896bbf3be4c59dbe76e5 Mon Sep 17 00:00:00 2001 From: Jorge Turrado Date: Sat, 9 Dec 2023 12:27:44 +0100 Subject: [PATCH] Update scaler Signed-off-by: Jorge Turrado --- pkg/k8s/endpoints_cache.go | 31 ++++++++++++++++++++++ scaler/handlers_test.go | 12 ++++----- scaler/main.go | 13 +++++----- scaler/queue_pinger.go | 52 ++++++++++++++++++------------------- scaler/queue_pinger_test.go | 2 +- 5 files changed, 70 insertions(+), 40 deletions(-) create mode 100644 pkg/k8s/endpoints_cache.go diff --git a/pkg/k8s/endpoints_cache.go b/pkg/k8s/endpoints_cache.go new file mode 100644 index 00000000..9c1202fa --- /dev/null +++ b/pkg/k8s/endpoints_cache.go @@ -0,0 +1,31 @@ +package k8s + +import ( + "encoding/json" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" +) + +// EndpointsCache is a simple cache of endpoints. +// It allows callers to quickly get given endpoints in a given +// namespace, or watch for changes to specific endpoints, all +// without incurring the cost of issuing a network request +// to the Kubernetes API +type EndpointsCache interface { + json.Marshaler + // Get gets the endpoints with the given name + // in the given namespace from the cache. + // + // If the endpoints doesn't exist in the cache, it + // will be requested from the backing store (most commonly + // the Kubernetes API server) + Get(namespace, name string) (v1.Endpoints, error) + // Watch opens a watch stream for the endpoints with + // the given name in the given namespace from the cache. + // + // If the endpoints don't exist in the cache, it + // will be requested from the backing store (most commonly + // the Kubernetes API server) + Watch(namespace, name string) (watch.Interface, error) +} diff --git a/scaler/handlers_test.go b/scaler/handlers_test.go index fe1ec82c..023ec8a9 100644 --- a/scaler/handlers_test.go +++ b/scaler/handlers_test.go @@ -346,9 +346,9 @@ func TestGetMetricSpecTable(t *testing.T) { }, Spec: httpv1alpha1.HTTPScaledObjectSpec{ ScaleTargetRef: httpv1alpha1.ScaleTargetRef{ - Deployment: "testdepl", - Service: "testsrv", - Port: 8080, + Name: "testdepl", + Service: "testsrv", + Port: 8080, }, TargetPendingRequests: ptr.To[int32](123), }, @@ -387,9 +387,9 @@ func TestGetMetricSpecTable(t *testing.T) { "validHost2", }, ScaleTargetRef: httpv1alpha1.ScaleTargetRef{ - Deployment: "testdepl", - Service: "testsrv", - Port: 8080, + Name: "testdepl", + Service: "testsrv", + Port: 8080, }, TargetPendingRequests: ptr.To[int32](123), }, diff --git a/scaler/main.go b/scaler/main.go index bc9030d6..d41c05aa 100644 --- a/scaler/main.go +++ b/scaler/main.go @@ -33,7 +33,6 @@ import ( ) // +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch -// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch // +kubebuilder:rbac:groups=http.keda.sh,resources=httpscaledobjects,verbs=get;list;watch func main() { @@ -74,8 +73,8 @@ func main() { os.Exit(1) } - // create the deployment informer - deployInformer := k8s.NewInformerBackedDeploymentCache( + // create the endpoints informer + endpInformer := k8s.NewInformerBackedEndpointsCache( lggr, k8sCl, cfg.DeploymentCacheRsyncPeriod, @@ -94,11 +93,11 @@ func main() { eg, ctx := errgroup.WithContext(ctx) - // start the deployment informer + // start the endpoints informer eg.Go(func() error { - lggr.Info("starting the deployment informer") + lggr.Info("starting the endpoints informer") - deployInformer.Start(ctx) + endpInformer.Start(ctx) return nil }) @@ -113,7 +112,7 @@ func main() { eg.Go(func() error { lggr.Info("starting the queue pinger") - if err := pinger.start(ctx, time.NewTicker(cfg.QueueTickDuration), deployInformer); !util.IsIgnoredErr(err) { + if err := pinger.start(ctx, time.NewTicker(cfg.QueueTickDuration), endpInformer); !util.IsIgnoredErr(err) { lggr.Error(err, "queue pinger failed") return err } diff --git a/scaler/queue_pinger.go b/scaler/queue_pinger.go index 6f139613..548c8dd8 100644 --- a/scaler/queue_pinger.go +++ b/scaler/queue_pinger.go @@ -34,16 +34,16 @@ import ( // // context // go pinger.start(ctx, ticker) type queuePinger struct { - getEndpointsFn k8s.GetEndpointsFunc - interceptorNS string - interceptorSvcName string - interceptorDeplName string - adminPort string - pingMut *sync.RWMutex - lastPingTime time.Time - allCounts map[string]int - aggregateCount int - lggr logr.Logger + getEndpointsFn k8s.GetEndpointsFunc + interceptorNS string + interceptorSvcName string + interceptorServiceName string + adminPort string + pingMut *sync.RWMutex + lastPingTime time.Time + allCounts map[string]int + aggregateCount int + lggr logr.Logger } func newQueuePinger( @@ -57,15 +57,15 @@ func newQueuePinger( ) (*queuePinger, error) { pingMut := new(sync.RWMutex) pinger := &queuePinger{ - getEndpointsFn: getEndpointsFn, - interceptorNS: ns, - interceptorSvcName: svcName, - interceptorDeplName: deplName, - adminPort: adminPort, - pingMut: pingMut, - lggr: lggr, - allCounts: map[string]int{}, - aggregateCount: 0, + getEndpointsFn: getEndpointsFn, + interceptorNS: ns, + interceptorSvcName: svcName, + interceptorServiceName: deplName, + adminPort: adminPort, + pingMut: pingMut, + lggr: lggr, + allCounts: map[string]int{}, + aggregateCount: 0, } return pinger, pinger.fetchAndSaveCounts(ctx) } @@ -74,14 +74,14 @@ func newQueuePinger( func (q *queuePinger) start( ctx context.Context, ticker *time.Ticker, - deplCache k8s.DeploymentCache, + endpCache k8s.EndpointsCache, ) error { - deployWatchIface, err := deplCache.Watch(q.interceptorNS, q.interceptorDeplName) + endpoWatchIface, err := endpCache.Watch(q.interceptorNS, q.interceptorServiceName) if err != nil { return err } - deployEvtChan := deployWatchIface.ResultChan() - defer deployWatchIface.Stop() + endpEvtChan := endpoWatchIface.ResultChan() + defer endpoWatchIface.Stop() lggr := q.lggr.WithName("scaler.queuePinger.start") defer ticker.Stop() @@ -102,13 +102,13 @@ func (q *queuePinger) start( return fmt.Errorf("error getting request counts: %w", err) } // handle changes to the interceptor fleet - // Deployment - case <-deployEvtChan: + // Endpoints + case <-endpEvtChan: err := q.fetchAndSaveCounts(ctx) if err != nil { lggr.Error( err, - "getting request counts after interceptor deployment event", + "getting request counts after interceptor endpoints event", ) } } diff --git a/scaler/queue_pinger_test.go b/scaler/queue_pinger_test.go index 2159ed70..9eb66243 100644 --- a/scaler/queue_pinger_test.go +++ b/scaler/queue_pinger_test.go @@ -73,7 +73,7 @@ func TestCounts(t *testing.T) { r.NoError(q.Resize("host3", 3)) r.NoError(q.Resize("host4", 4)) ticker := time.NewTicker(tickDur) - fakeCache := k8s.NewFakeDeploymentCache() + fakeCache := k8s.NewFakeEndpointsCache() go func() { _ = pinger.start(ctx, ticker, fakeCache) }()