Skip to content

Commit

Permalink
Add interceptor
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Turrado <[email protected]>
  • Loading branch information
JorTurFer committed Dec 9, 2023
1 parent 45a887e commit d13d0e9
Show file tree
Hide file tree
Showing 17 changed files with 483 additions and 543 deletions.
7 changes: 6 additions & 1 deletion interceptor/config/serving.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ type Serving struct {
// ConfigMapCacheRsyncPeriod is the time interval
// for the configmap informer to rsync the local cache.
ConfigMapCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_CONFIG_MAP_INFORMER_RSYNC_PERIOD" default:"60m"`
// The interceptor has an internal process that periodically fetches the state
// DEPRECATED: The interceptor has an internal process that periodically fetches the state
// of deployment that is running the servers it forwards to.
//
// This is the interval (in milliseconds) representing how often to do a fetch
DeploymentCachePollIntervalMS int `envconfig:"KEDA_HTTP_DEPLOYMENT_CACHE_POLLING_INTERVAL_MS" default:"250"`
//The interceptor has an internal process that periodically fetches the state
// of endpoints that is running the servers it forwards to.
//
// This is the interval (in milliseconds) representing how often to do a fetch
EndpointsCachePollIntervalMS int `envconfig:"KEDA_HTTP_ENDPOINTS_CACHE_POLLING_INTERVAL_MS" default:"250"`
}

// Parse parses standard configs using envconfig and returns a pointer to the
Expand Down
4 changes: 2 additions & 2 deletions interceptor/config/timeouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ type Timeouts struct {
// ResponseHeaderTimeout is how long to wait between when the HTTP request
// is sent to the backing app and when response headers need to arrive
ResponseHeader time.Duration `envconfig:"KEDA_RESPONSE_HEADER_TIMEOUT" default:"500ms"`
// DeploymentReplicas is how long to wait for the backing deployment
// WorkloadReplicas is how long to wait for the backing workload
// to have 1 or more replicas before connecting and sending the HTTP request.
DeploymentReplicas time.Duration `envconfig:"KEDA_CONDITION_WAIT_TIMEOUT" default:"1500ms"`
WorkloadReplicas time.Duration `envconfig:"KEDA_CONDITION_WAIT_TIMEOUT" default:"1500ms"`
// ForceHTTP2 toggles whether to try to force HTTP2 for all requests
ForceHTTP2 bool `envconfig:"KEDA_HTTP_FORCE_HTTP2" default:"false"`
// MaxIdleConns is the max number of connections that can be idle in the
Expand Down
32 changes: 26 additions & 6 deletions interceptor/config/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,36 @@ package config

import (
"fmt"
"os"
"time"

"github.com/go-logr/logr"
)

func Validate(srvCfg Serving, timeoutsCfg Timeouts) error {
deplCachePollInterval := time.Duration(srvCfg.DeploymentCachePollIntervalMS) * time.Millisecond
if timeoutsCfg.DeploymentReplicas < deplCachePollInterval {
func Validate(srvCfg *Serving, timeoutsCfg Timeouts, lggr logr.Logger) error {
// TODO(jorturfer): delete this on v0.8.0
_, deploymentEnvExist := os.LookupEnv("KEDA_HTTP_DEPLOYMENT_CACHE_POLLING_INTERVAL_MS")
_, endpointsEnvExist := os.LookupEnv("KEDA_HTTP_ENDPOINTS_CACHE_POLLING_INTERVAL_MS")
if deploymentEnvExist && endpointsEnvExist {
return fmt.Errorf(
"%s and %s are mutual exclusive",
"KEDA_HTTP_DEPLOYMENT_CACHE_POLLING_INTERVAL_MS",
"KEDA_HTTP_ENDPOINTS_CACHE_POLLING_INTERVAL_MS",
)
}
if deploymentEnvExist && !endpointsEnvExist {
srvCfg.EndpointsCachePollIntervalMS = srvCfg.DeploymentCachePollIntervalMS
srvCfg.DeploymentCachePollIntervalMS = 0
lggr.Info("WARNING: KEDA_HTTP_DEPLOYMENT_CACHE_POLLING_INTERVAL_MS has been deprecated in favor of KEDA_HTTP_ENDPOINTS_CACHE_POLLING_INTERVAL_MS and wil be removed on v0.8.0")
}
// END TODO

endpointsCachePollInterval := time.Duration(srvCfg.EndpointsCachePollIntervalMS) * time.Millisecond
if timeoutsCfg.WorkloadReplicas < endpointsCachePollInterval {
return fmt.Errorf(
"deployment replicas timeout (%s) should not be less than the Deployment Cache Poll Interval (%s)",
timeoutsCfg.DeploymentReplicas,
deplCachePollInterval,
"workload replicas timeout (%s) should not be less than the Endpoints Cache Poll Interval (%s)",
timeoutsCfg.WorkloadReplicas,
endpointsCachePollInterval,
)
}
return nil
Expand Down
56 changes: 30 additions & 26 deletions interceptor/forward_wait_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,65 +5,69 @@ import (
"fmt"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"

"github.com/kedacore/http-add-on/pkg/k8s"
)

// forwardWaitFunc is a function that waits for a condition
// before proceeding to serve the request.
type forwardWaitFunc func(context.Context, string, string) (int, error)
type forwardWaitFunc func(context.Context, string, string) (bool, error)

func deploymentCanServe(depl appsv1.Deployment) bool {
return depl.Status.ReadyReplicas > 0
func workloadActiveEndpoints(endpoints v1.Endpoints) int {
total := 0
for _, subset := range endpoints.Subsets {
total += len(subset.Addresses)
}
return total
}

func newDeployReplicasForwardWaitFunc(
func newWorkloadReplicasForwardWaitFunc(
lggr logr.Logger,
deployCache k8s.DeploymentCache,
endpointCache k8s.EndpointsCache,
) forwardWaitFunc {
return func(ctx context.Context, deployNS, deployName string) (int, error) {
return func(ctx context.Context, endpointNS, endpointName string) (bool, error) {
// get a watcher & its result channel before querying the
// deployment cache, to ensure we don't miss events
watcher, err := deployCache.Watch(deployNS, deployName)
// endpoints cache, to ensure we don't miss events
watcher, err := endpointCache.Watch(endpointNS, endpointName)
if err != nil {
return 0, err
return false, err
}
eventCh := watcher.ResultChan()
defer watcher.Stop()

deployment, err := deployCache.Get(deployNS, deployName)
endpoints, err := endpointCache.Get(endpointNS, endpointName)
if err != nil {
// if we didn't get the initial deployment state, bail out
return 0, fmt.Errorf(
"error getting state for deployment %s/%s: %w",
deployNS,
deployName,
// if we didn't get the initial endpoints state, bail out
return false, fmt.Errorf(
"error getting state for endpoints %s/%s: %w",
endpointNS,
endpointName,
err,
)
}
// if there is 1 or more replica, we're done waiting
if deploymentCanServe(deployment) {
return int(deployment.Status.ReadyReplicas), nil
// if there is 1 or more active endpoints, we're done waiting
activeEndpoints := workloadActiveEndpoints(endpoints)
if activeEndpoints > 0 {
return false, nil
}

for {
select {
case event := <-eventCh:
deployment, ok := event.Object.(*appsv1.Deployment)
endpoints, ok := event.Object.(*v1.Endpoints)
if !ok {
lggr.Info(
"Didn't get a deployment back in event",
"Didn't get a endpoints back in event",
)
} else if deploymentCanServe(*deployment) {
return 0, nil
} else if activeEndpoints := workloadActiveEndpoints(*endpoints); activeEndpoints > 0 {
return true, nil
}
case <-ctx.Done():
// otherwise, if the context is marked done before
// we're done waiting, fail.
return 0, fmt.Errorf(
"context marked done while waiting for deployment %s to reach > 0 replicas: %w",
deployName,
return false, fmt.Errorf(
"context marked done while waiting for workload reach > 0 replicas: %w",
ctx.Err(),
)
}
Expand Down
146 changes: 44 additions & 102 deletions interceptor/forward_wait_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,79 +8,63 @@ import (
"github.com/go-logr/logr"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

"github.com/kedacore/http-add-on/pkg/k8s"
)

// Test to make sure the wait function returns a nil error if there is immediately
// one replica on the target deployment
// one active endpoint on the target deployment
func TestForwardWaitFuncOneReplica(t *testing.T) {
ctx := context.Background()

const waitFuncWait = 1 * time.Second
r := require.New(t)
const ns = "testNS"
const deployName = "TestForwardingHandlerDeploy"
cache := k8s.NewFakeDeploymentCache()
cache.AddDeployment(*newDeployment(
ns,
deployName,
"myimage",
[]int32{123},
nil,
map[string]string{},
corev1.PullAlways,
))
const endpointsName = "TestForwardingHandler"
endpoints := *newEndpoint(ns, endpointsName)
cache := k8s.NewFakeEndpointsCache()
cache.Set(endpoints)
cache.SetSubsets(ns, endpointsName, 1)

ctx, done := context.WithTimeout(ctx, waitFuncWait)
defer done()
group, ctx := errgroup.WithContext(ctx)

waitFunc := newDeployReplicasForwardWaitFunc(
waitFunc := newWorkloadReplicasForwardWaitFunc(
logr.Discard(),
cache,
)

group.Go(func() error {
_, err := waitFunc(ctx, ns, deployName)
_, err := waitFunc(ctx, ns, endpointsName)
return err
})
r.NoError(group.Wait(), "wait function failed, but it shouldn't have")
}

// Test to make sure the wait function returns an error if there are no replicas, and that doesn't change
// Test to make sure the wait function returns an error if there are active endpoints, and that doesn't change
// within a timeout
func TestForwardWaitFuncNoReplicas(t *testing.T) {
ctx := context.Background()
const waitFuncWait = 1 * time.Second
r := require.New(t)
const ns = "testNS"
const deployName = "TestForwardingHandlerHoldsDeployment"
deployment := newDeployment(
ns,
deployName,
"myimage",
[]int32{123},
nil,
map[string]string{},
corev1.PullAlways,
)
deployment.Status.ReadyReplicas = 0
cache := k8s.NewFakeDeploymentCache()
cache.AddDeployment(*deployment)
const endpointsName = "TestForwardWaitFuncNoReplicas"
endpoints := *newEndpoint(ns, endpointsName)
cache := k8s.NewFakeEndpointsCache()
cache.Set(endpoints)

ctx, done := context.WithTimeout(ctx, waitFuncWait)
defer done()
waitFunc := newDeployReplicasForwardWaitFunc(
waitFunc := newWorkloadReplicasForwardWaitFunc(
logr.Discard(),
cache,
)

_, err := waitFunc(ctx, ns, deployName)
_, err := waitFunc(ctx, ns, endpointsName)
r.Error(err)
}

Expand All @@ -90,100 +74,58 @@ func TestWaitFuncWaitsUntilReplicas(t *testing.T) {
totalWaitDur := 500 * time.Millisecond

const ns = "testNS"
const deployName = "TestForwardingHandlerHoldsDeployment"
deployment := newDeployment(
ns,
deployName,
"myimage",
[]int32{123},
nil,
map[string]string{},
corev1.PullAlways,
)
deployment.Spec.Replicas = k8s.Int32P(0)
cache := k8s.NewFakeDeploymentCache()
cache.AddDeployment(*deployment)
const endpointsName = "TestForwardingHandlerHolds"

endpoints := *newEndpoint(ns, endpointsName)
cache := k8s.NewFakeEndpointsCache()
cache.Set(endpoints)
// create a watcher first so that the goroutine
// can later fetch it and send a message on it
_, err := cache.Watch(ns, deployName)
_, err := cache.Watch(ns, endpointsName)
r.NoError(err)

ctx, done := context.WithTimeout(ctx, totalWaitDur)
waitFunc := newDeployReplicasForwardWaitFunc(
waitFunc := newWorkloadReplicasForwardWaitFunc(
logr.Discard(),
cache,
)

// this channel will be closed immediately after the replicas were increased
replicasIncreasedCh := make(chan struct{})
// this channel will be closed immediately after the active endpoints were increased
activeEndpointsIncreasedCh := make(chan struct{})
go func() {
time.Sleep(totalWaitDur / 2)
watcher := cache.GetWatcher(ns, deployName)
watcher := cache.GetWatcher(ns, endpointsName)
r.NotNil(watcher, "watcher was not found")
modifiedDeployment := deployment.DeepCopy()
modifiedDeployment.Spec.Replicas = k8s.Int32P(1)
watcher.Action(watch.Modified, modifiedDeployment)
close(replicasIncreasedCh)
modifiedEndpoints := endpoints.DeepCopy()
modifiedEndpoints.Subsets = []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{IP: "1.2.3.4"},
},
},
}
watcher.Action(watch.Modified, modifiedEndpoints)
close(activeEndpointsIncreasedCh)
}()
_, err = waitFunc(ctx, ns, deployName)
_, err = waitFunc(ctx, ns, endpointsName)
r.NoError(err)
done()
}

// newDeployment creates a new deployment object
// newEndpoint creates a new endpoints object
// with the given name and the given image. This does not actually create
// the deployment in the cluster, it just creates the deployment object
// the endpoints in the cluster, it just creates the endpoints object
// in memory
func newDeployment(
func newEndpoint(
namespace,
name,
image string,
ports []int32,
env []corev1.EnvVar,
labels map[string]string,
pullPolicy corev1.PullPolicy,
) *appsv1.Deployment {
containerPorts := make([]corev1.ContainerPort, len(ports))
for i, port := range ports {
containerPorts[i] = corev1.ContainerPort{
ContainerPort: port,
}
}
deployment := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
},
name string,
) *v1.Endpoints {
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: labels,
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Replicas: k8s.Int32P(1),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: image,
Name: name,
ImagePullPolicy: pullPolicy,
Ports: containerPorts,
Env: env,
},
},
},
},
},
Status: appsv1.DeploymentStatus{
ReadyReplicas: 1,
},
}

return deployment
return endpoints
}
Loading

0 comments on commit d13d0e9

Please sign in to comment.