Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions example-disable-clusterip-routing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: example-service
spec:
template:
metadata:
annotations:
# Disable ClusterIP routing for this revision
# When set to "true", the activator will only use pod IP routing
serving.knative.dev/disable-clusterip-routing: "true"
spec:
containers:
- image: gcr.io/knative-samples/helloworld-go
ports:
- containerPort: 8080
env:
- name: TARGET
value: "Go Sample v1"

13 changes: 11 additions & 2 deletions pkg/activator/net/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ type revisionThrottler struct {
revID types.NamespacedName
containerConcurrency int
lbPolicy lbPolicy
disableClusterIP bool

// These are used in slicing to infer which pods to assign
// to this activator.
Expand Down Expand Up @@ -171,6 +172,7 @@ type revisionThrottler struct {
func newRevisionThrottler(revID types.NamespacedName,
containerConcurrency int, proto string,
breakerParams queue.BreakerParams,
disableClusterIP bool,
logger *zap.SugaredLogger,
) *revisionThrottler {
logger = logger.With(zap.String(logkey.Key, revID.String()))
Expand Down Expand Up @@ -198,6 +200,7 @@ func newRevisionThrottler(revID types.NamespacedName,
logger: logger,
protocol: proto,
lbPolicy: lbp,
disableClusterIP: disableClusterIP,
}

// Start with unknown
Expand Down Expand Up @@ -414,10 +417,11 @@ func (rt *revisionThrottler) handleUpdate(update revisionDestsUpdate) {
rt.logger.Debugw("Handling update",
zap.String("ClusterIP", update.ClusterIPDest), zap.Object("dests", logging.StringSet(update.Dests)))

// ClusterIP is not yet ready, so we want to send requests directly to the pods.
// If ClusterIP routing is disabled OR ClusterIP is not yet ready,
// we want to send requests directly to the pods.
// NB: this will not be called in parallel, thus we can build a new podTrackers
// array before taking out a lock.
if update.ClusterIPDest == "" {
if rt.disableClusterIP || update.ClusterIPDest == "" {
// Create a map for fast lookup of existing trackers.
trackersMap := make(map[string]*podTracker, len(rt.podTrackers))
for _, tracker := range rt.podTrackers {
Expand Down Expand Up @@ -546,11 +550,16 @@ func (t *Throttler) getOrCreateRevisionThrottler(revID types.NamespacedName) (*r
if err != nil {
return nil, err
}
// Check if ClusterIP routing is disabled via annotation
disableClusterIP := rev.Annotations != nil &&
rev.Annotations[serving.DisableClusterIPRoutingAnnotationKey] == "true"

revThrottler = newRevisionThrottler(
revID,
int(rev.Spec.GetContainerConcurrency()),
pkgnet.ServicePortName(rev.GetProtocol()),
queue.BreakerParams{QueueDepth: breakerQueueDepth, MaxConcurrency: revisionMaxConcurrency},
disableClusterIP,
t.logger,
)
t.revisionThrottlers[revID] = revThrottler
Expand Down
81 changes: 78 additions & 3 deletions pkg/activator/net/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ func TestPodAssignmentFinite(t *testing.T) {
defer cancel()

throttler := newTestThrottler(ctx)
rt := newRevisionThrottler(revName, 42 /*cc*/, pkgnet.ServicePortNameHTTP1, testBreakerParams, logger)
rt := newRevisionThrottler(revName, 42 /*cc*/, pkgnet.ServicePortNameHTTP1, testBreakerParams, false /*disableClusterIP*/, logger)
rt.numActivators.Store(4)
rt.activatorIndex.Store(0)
throttler.revisionThrottlers[revName] = rt
Expand Down Expand Up @@ -670,7 +670,7 @@ func TestPodAssignmentInfinite(t *testing.T) {
defer cancel()

throttler := newTestThrottler(ctx)
rt := newRevisionThrottler(revName, 0 /*cc*/, pkgnet.ServicePortNameHTTP1, testBreakerParams, logger)
rt := newRevisionThrottler(revName, 0 /*cc*/, pkgnet.ServicePortNameHTTP1, testBreakerParams, false /*disableClusterIP*/, logger)
throttler.revisionThrottlers[revName] = rt

update := revisionDestsUpdate{
Expand Down Expand Up @@ -902,12 +902,87 @@ func TestMultipleActivators(t *testing.T) {
func TestInfiniteBreakerCreation(t *testing.T) {
// This test verifies that we use infiniteBreaker when CC==0.
tttl := newRevisionThrottler(types.NamespacedName{Namespace: "a", Name: "b"}, 0, /*cc*/
pkgnet.ServicePortNameHTTP1, queue.BreakerParams{}, TestLogger(t))
pkgnet.ServicePortNameHTTP1, queue.BreakerParams{}, false /*disableClusterIP*/, TestLogger(t))
if _, ok := tttl.breaker.(*infiniteBreaker); !ok {
t.Errorf("The type of revisionBreaker = %T, want %T", tttl, (*infiniteBreaker)(nil))
}
}

func TestDisableClusterIPRouting(t *testing.T) {
// Test that when disableClusterIP is true, we always use pod trackers even when ClusterIP is available
logger := TestLogger(t)
revName := types.NamespacedName{Namespace: testNamespace, Name: testRevision}

// Create a revision throttler with ClusterIP disabled
rt := newRevisionThrottler(revName, 42 /*cc*/, pkgnet.ServicePortNameHTTP1,
testBreakerParams, true /*disableClusterIP*/, logger)

// Update with both ClusterIP and pod destinations
update := revisionDestsUpdate{
Rev: revName,
ClusterIPDest: "10.0.0.1:80", // ClusterIP is available
Dests: sets.New("ip1", "ip2", "ip3"),
}

rt.handleUpdate(update)

// Verify that we're using pod trackers, not ClusterIP
rt.mux.RLock()
defer rt.mux.RUnlock()

if rt.clusterIPTracker != nil {
t.Error("Expected clusterIPTracker to be nil when ClusterIP routing is disabled")
}

if len(rt.podTrackers) != 3 {
t.Errorf("Expected 3 pod trackers, got %d", len(rt.podTrackers))
}

// Verify pod trackers have correct destinations
expectedDests := sets.New("ip1", "ip2", "ip3")
for _, tracker := range rt.podTrackers {
if !expectedDests.Has(tracker.dest) {
t.Errorf("Unexpected pod tracker destination: %s", tracker.dest)
}
expectedDests.Delete(tracker.dest)
}
}

func TestEnableClusterIPRoutingByDefault(t *testing.T) {
// Test that when disableClusterIP is false (default), we use ClusterIP when available
logger := TestLogger(t)
revName := types.NamespacedName{Namespace: testNamespace, Name: testRevision}

// Create a revision throttler with ClusterIP enabled (default)
rt := newRevisionThrottler(revName, 42 /*cc*/, pkgnet.ServicePortNameHTTP1,
testBreakerParams, false /*disableClusterIP*/, logger)

// Update with both ClusterIP and pod destinations
update := revisionDestsUpdate{
Rev: revName,
ClusterIPDest: "10.0.0.1:80", // ClusterIP is available
Dests: sets.New("ip1", "ip2", "ip3"),
}

rt.handleUpdate(update)

// Verify that we're using ClusterIP, not pod trackers
rt.mux.RLock()
defer rt.mux.RUnlock()

if rt.clusterIPTracker == nil {
t.Error("Expected clusterIPTracker to be set when ClusterIP routing is enabled")
}

if rt.clusterIPTracker.dest != "10.0.0.1:80" {
t.Errorf("Expected clusterIPTracker dest to be '10.0.0.1:80', got %s", rt.clusterIPTracker.dest)
}

if rt.podTrackers != nil {
t.Error("Expected podTrackers to be nil when using ClusterIP")
}
}

func (t *Throttler) try(ctx context.Context, requests int, try func(string) error) chan tryResult {
resultChan := make(chan tryResult)

Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/serving/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ const (

// ProgressDeadlineAnnotationKey is the label key for the per revision progress deadline to set for the deployment
ProgressDeadlineAnnotationKey = GroupName + "/progress-deadline"

// DisableClusterIPRoutingAnnotationKey is the annotation key to disable ClusterIP routing for a revision
// When set to "true", the activator will only use pod IP routing and not use ClusterIP for load balancing.
// By default, ClusterIP routing is enabled.
DisableClusterIPRoutingAnnotationKey = GroupName + "/disable-clusterip-routing"
)

var (
Expand Down
3 changes: 2 additions & 1 deletion pkg/autoscaler/metrics/stat.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions test/test_images/grpc-ping/proto/ping.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading