Skip to content

Commit

Permalink
Update scaler
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Turrado <[email protected]>
  • Loading branch information
JorTurFer committed Dec 9, 2023
1 parent 79b1816 commit 45a887e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 40 deletions.
31 changes: 31 additions & 0 deletions pkg/k8s/endpoints_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package k8s

import (
"encoding/json"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)

// EndpointsCache is a simple cache of endpoints.
// It allows callers to quickly get given endpoints in a given
// namespace, or watch for changes to specific endpoints, all
// without incurring the cost of issuing a network request
// to the Kubernetes API
type EndpointsCache interface {
json.Marshaler
// Get gets the endpoints with the given name
// in the given namespace from the cache.
//
// If the endpoints doesn't exist in the cache, it
// will be requested from the backing store (most commonly
// the Kubernetes API server)
Get(namespace, name string) (v1.Endpoints, error)
// Watch opens a watch stream for the endpoints with
// the given name in the given namespace from the cache.
//
// If the endpoints don't exist in the cache, it
// will be requested from the backing store (most commonly
// the Kubernetes API server)
Watch(namespace, name string) (watch.Interface, error)
}
12 changes: 6 additions & 6 deletions scaler/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,9 @@ func TestGetMetricSpecTable(t *testing.T) {
},
Spec: httpv1alpha1.HTTPScaledObjectSpec{
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
Deployment: "testdepl",
Service: "testsrv",
Port: 8080,
Name: "testdepl",
Service: "testsrv",
Port: 8080,
},
TargetPendingRequests: ptr.To[int32](123),
},
Expand Down Expand Up @@ -387,9 +387,9 @@ func TestGetMetricSpecTable(t *testing.T) {
"validHost2",
},
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
Deployment: "testdepl",
Service: "testsrv",
Port: 8080,
Name: "testdepl",
Service: "testsrv",
Port: 8080,
},
TargetPendingRequests: ptr.To[int32](123),
},
Expand Down
13 changes: 6 additions & 7 deletions scaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
)

// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch
// +kubebuilder:rbac:groups=http.keda.sh,resources=httpscaledobjects,verbs=get;list;watch

func main() {
Expand Down Expand Up @@ -74,8 +73,8 @@ func main() {
os.Exit(1)
}

// create the deployment informer
deployInformer := k8s.NewInformerBackedDeploymentCache(
// create the endpoints informer
endpInformer := k8s.NewInformerBackedEndpointsCache(
lggr,
k8sCl,
cfg.DeploymentCacheRsyncPeriod,
Expand All @@ -94,11 +93,11 @@ func main() {

eg, ctx := errgroup.WithContext(ctx)

// start the deployment informer
// start the endpoints informer
eg.Go(func() error {
lggr.Info("starting the deployment informer")
lggr.Info("starting the endpoints informer")

deployInformer.Start(ctx)
endpInformer.Start(ctx)
return nil
})

Expand All @@ -113,7 +112,7 @@ func main() {
eg.Go(func() error {
lggr.Info("starting the queue pinger")

if err := pinger.start(ctx, time.NewTicker(cfg.QueueTickDuration), deployInformer); !util.IsIgnoredErr(err) {
if err := pinger.start(ctx, time.NewTicker(cfg.QueueTickDuration), endpInformer); !util.IsIgnoredErr(err) {
lggr.Error(err, "queue pinger failed")
return err
}
Expand Down
52 changes: 26 additions & 26 deletions scaler/queue_pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ import (
// // context
// go pinger.start(ctx, ticker)
type queuePinger struct {
getEndpointsFn k8s.GetEndpointsFunc
interceptorNS string
interceptorSvcName string
interceptorDeplName string
adminPort string
pingMut *sync.RWMutex
lastPingTime time.Time
allCounts map[string]int
aggregateCount int
lggr logr.Logger
getEndpointsFn k8s.GetEndpointsFunc
interceptorNS string
interceptorSvcName string
interceptorServiceName string
adminPort string
pingMut *sync.RWMutex
lastPingTime time.Time
allCounts map[string]int
aggregateCount int
lggr logr.Logger
}

func newQueuePinger(
Expand All @@ -57,15 +57,15 @@ func newQueuePinger(
) (*queuePinger, error) {
pingMut := new(sync.RWMutex)
pinger := &queuePinger{
getEndpointsFn: getEndpointsFn,
interceptorNS: ns,
interceptorSvcName: svcName,
interceptorDeplName: deplName,
adminPort: adminPort,
pingMut: pingMut,
lggr: lggr,
allCounts: map[string]int{},
aggregateCount: 0,
getEndpointsFn: getEndpointsFn,
interceptorNS: ns,
interceptorSvcName: svcName,
interceptorServiceName: deplName,
adminPort: adminPort,
pingMut: pingMut,
lggr: lggr,
allCounts: map[string]int{},
aggregateCount: 0,
}
return pinger, pinger.fetchAndSaveCounts(ctx)
}
Expand All @@ -74,14 +74,14 @@ func newQueuePinger(
func (q *queuePinger) start(
ctx context.Context,
ticker *time.Ticker,
deplCache k8s.DeploymentCache,
endpCache k8s.EndpointsCache,
) error {
deployWatchIface, err := deplCache.Watch(q.interceptorNS, q.interceptorDeplName)
endpoWatchIface, err := endpCache.Watch(q.interceptorNS, q.interceptorServiceName)
if err != nil {
return err
}
deployEvtChan := deployWatchIface.ResultChan()
defer deployWatchIface.Stop()
endpEvtChan := endpoWatchIface.ResultChan()
defer endpoWatchIface.Stop()

lggr := q.lggr.WithName("scaler.queuePinger.start")
defer ticker.Stop()
Expand All @@ -102,13 +102,13 @@ func (q *queuePinger) start(
return fmt.Errorf("error getting request counts: %w", err)
}
// handle changes to the interceptor fleet
// Deployment
case <-deployEvtChan:
// Endpoints
case <-endpEvtChan:
err := q.fetchAndSaveCounts(ctx)
if err != nil {
lggr.Error(
err,
"getting request counts after interceptor deployment event",
"getting request counts after interceptor endpoints event",
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion scaler/queue_pinger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestCounts(t *testing.T) {
r.NoError(q.Resize("host3", 3))
r.NoError(q.Resize("host4", 4))
ticker := time.NewTicker(tickDur)
fakeCache := k8s.NewFakeDeploymentCache()
fakeCache := k8s.NewFakeEndpointsCache()
go func() {
_ = pinger.start(ctx, ticker, fakeCache)
}()
Expand Down

0 comments on commit 45a887e

Please sign in to comment.