diff --git a/example-disable-clusterip-routing.yaml b/example-disable-clusterip-routing.yaml new file mode 100644 index 000000000000..b3a41aebdaaa --- /dev/null +++ b/example-disable-clusterip-routing.yaml @@ -0,0 +1,20 @@ +apiVersion: serving.knative.dev/v1 +kind: Service +metadata: + name: example-service +spec: + template: + metadata: + annotations: + # Disable ClusterIP routing for this revision + # When set to "true", the activator will only use pod IP routing + serving.knative.dev/disable-clusterip-routing: "true" + spec: + containers: + - image: gcr.io/knative-samples/helloworld-go + ports: + - containerPort: 8080 + env: + - name: TARGET + value: "Go Sample v1" + diff --git a/pkg/activator/net/throttler.go b/pkg/activator/net/throttler.go index 0ef298c48db4..816617e9cede 100644 --- a/pkg/activator/net/throttler.go +++ b/pkg/activator/net/throttler.go @@ -132,6 +132,7 @@ type revisionThrottler struct { revID types.NamespacedName containerConcurrency int lbPolicy lbPolicy + disableClusterIP bool // These are used in slicing to infer which pods to assign // to this activator. @@ -171,6 +172,7 @@ type revisionThrottler struct { func newRevisionThrottler(revID types.NamespacedName, containerConcurrency int, proto string, breakerParams queue.BreakerParams, + disableClusterIP bool, logger *zap.SugaredLogger, ) *revisionThrottler { logger = logger.With(zap.String(logkey.Key, revID.String())) @@ -198,6 +200,7 @@ func newRevisionThrottler(revID types.NamespacedName, logger: logger, protocol: proto, lbPolicy: lbp, + disableClusterIP: disableClusterIP, } // Start with unknown @@ -414,10 +417,11 @@ func (rt *revisionThrottler) handleUpdate(update revisionDestsUpdate) { rt.logger.Debugw("Handling update", zap.String("ClusterIP", update.ClusterIPDest), zap.Object("dests", logging.StringSet(update.Dests))) - // ClusterIP is not yet ready, so we want to send requests directly to the pods. + // If ClusterIP routing is disabled OR ClusterIP is not yet ready, + // we want to send requests directly to the pods. // NB: this will not be called in parallel, thus we can build a new podTrackers // array before taking out a lock. - if update.ClusterIPDest == "" { + if rt.disableClusterIP || update.ClusterIPDest == "" { // Create a map for fast lookup of existing trackers. trackersMap := make(map[string]*podTracker, len(rt.podTrackers)) for _, tracker := range rt.podTrackers { @@ -546,11 +550,16 @@ func (t *Throttler) getOrCreateRevisionThrottler(revID types.NamespacedName) (*r if err != nil { return nil, err } + // Check if ClusterIP routing is disabled via annotation + disableClusterIP := rev.Annotations != nil && + rev.Annotations[serving.DisableClusterIPRoutingAnnotationKey] == "true" + revThrottler = newRevisionThrottler( revID, int(rev.Spec.GetContainerConcurrency()), pkgnet.ServicePortName(rev.GetProtocol()), queue.BreakerParams{QueueDepth: breakerQueueDepth, MaxConcurrency: revisionMaxConcurrency}, + disableClusterIP, t.logger, ) t.revisionThrottlers[revID] = revThrottler diff --git a/pkg/activator/net/throttler_test.go b/pkg/activator/net/throttler_test.go index ed727a26c79b..4dccb8f0edb5 100644 --- a/pkg/activator/net/throttler_test.go +++ b/pkg/activator/net/throttler_test.go @@ -618,7 +618,7 @@ func TestPodAssignmentFinite(t *testing.T) { defer cancel() throttler := newTestThrottler(ctx) - rt := newRevisionThrottler(revName, 42 /*cc*/, pkgnet.ServicePortNameHTTP1, testBreakerParams, logger) + rt := newRevisionThrottler(revName, 42 /*cc*/, pkgnet.ServicePortNameHTTP1, testBreakerParams, false /*disableClusterIP*/, logger) rt.numActivators.Store(4) rt.activatorIndex.Store(0) throttler.revisionThrottlers[revName] = rt @@ -670,7 +670,7 @@ func TestPodAssignmentInfinite(t *testing.T) { defer cancel() throttler := newTestThrottler(ctx) - rt := newRevisionThrottler(revName, 0 /*cc*/, pkgnet.ServicePortNameHTTP1, testBreakerParams, logger) + rt := newRevisionThrottler(revName, 0 /*cc*/, pkgnet.ServicePortNameHTTP1, testBreakerParams, false /*disableClusterIP*/, logger) throttler.revisionThrottlers[revName] = rt update := revisionDestsUpdate{ @@ -902,12 +902,87 @@ func TestMultipleActivators(t *testing.T) { func TestInfiniteBreakerCreation(t *testing.T) { // This test verifies that we use infiniteBreaker when CC==0. tttl := newRevisionThrottler(types.NamespacedName{Namespace: "a", Name: "b"}, 0, /*cc*/ - pkgnet.ServicePortNameHTTP1, queue.BreakerParams{}, TestLogger(t)) + pkgnet.ServicePortNameHTTP1, queue.BreakerParams{}, false /*disableClusterIP*/, TestLogger(t)) if _, ok := tttl.breaker.(*infiniteBreaker); !ok { t.Errorf("The type of revisionBreaker = %T, want %T", tttl, (*infiniteBreaker)(nil)) } } +func TestDisableClusterIPRouting(t *testing.T) { + // Test that when disableClusterIP is true, we always use pod trackers even when ClusterIP is available + logger := TestLogger(t) + revName := types.NamespacedName{Namespace: testNamespace, Name: testRevision} + + // Create a revision throttler with ClusterIP disabled + rt := newRevisionThrottler(revName, 42 /*cc*/, pkgnet.ServicePortNameHTTP1, + testBreakerParams, true /*disableClusterIP*/, logger) + + // Update with both ClusterIP and pod destinations + update := revisionDestsUpdate{ + Rev: revName, + ClusterIPDest: "10.0.0.1:80", // ClusterIP is available + Dests: sets.New("ip1", "ip2", "ip3"), + } + + rt.handleUpdate(update) + + // Verify that we're using pod trackers, not ClusterIP + rt.mux.RLock() + defer rt.mux.RUnlock() + + if rt.clusterIPTracker != nil { + t.Error("Expected clusterIPTracker to be nil when ClusterIP routing is disabled") + } + + if len(rt.podTrackers) != 3 { + t.Errorf("Expected 3 pod trackers, got %d", len(rt.podTrackers)) + } + + // Verify pod trackers have correct destinations + expectedDests := sets.New("ip1", "ip2", "ip3") + for _, tracker := range rt.podTrackers { + if !expectedDests.Has(tracker.dest) { + t.Errorf("Unexpected pod tracker destination: %s", tracker.dest) + } + expectedDests.Delete(tracker.dest) + } +} + +func TestEnableClusterIPRoutingByDefault(t *testing.T) { + // Test that when disableClusterIP is false (default), we use ClusterIP when available + logger := TestLogger(t) + revName := types.NamespacedName{Namespace: testNamespace, Name: testRevision} + + // Create a revision throttler with ClusterIP enabled (default) + rt := newRevisionThrottler(revName, 42 /*cc*/, pkgnet.ServicePortNameHTTP1, + testBreakerParams, false /*disableClusterIP*/, logger) + + // Update with both ClusterIP and pod destinations + update := revisionDestsUpdate{ + Rev: revName, + ClusterIPDest: "10.0.0.1:80", // ClusterIP is available + Dests: sets.New("ip1", "ip2", "ip3"), + } + + rt.handleUpdate(update) + + // Verify that we're using ClusterIP, not pod trackers + rt.mux.RLock() + defer rt.mux.RUnlock() + + if rt.clusterIPTracker == nil { + t.Error("Expected clusterIPTracker to be set when ClusterIP routing is enabled") + } + + if rt.clusterIPTracker.dest != "10.0.0.1:80" { + t.Errorf("Expected clusterIPTracker dest to be '10.0.0.1:80', got %s", rt.clusterIPTracker.dest) + } + + if rt.podTrackers != nil { + t.Error("Expected podTrackers to be nil when using ClusterIP") + } +} + func (t *Throttler) try(ctx context.Context, requests int, try func(string) error) chan tryResult { resultChan := make(chan tryResult) diff --git a/pkg/apis/serving/register.go b/pkg/apis/serving/register.go index 7fc51964f1e7..5c81290a422b 100644 --- a/pkg/apis/serving/register.go +++ b/pkg/apis/serving/register.go @@ -144,6 +144,11 @@ const ( // ProgressDeadlineAnnotationKey is the label key for the per revision progress deadline to set for the deployment ProgressDeadlineAnnotationKey = GroupName + "/progress-deadline" + + // DisableClusterIPRoutingAnnotationKey is the annotation key to disable ClusterIP routing for a revision + // When set to "true", the activator will only use pod IP routing and not use ClusterIP for load balancing. + // By default, ClusterIP routing is enabled. + DisableClusterIPRoutingAnnotationKey = GroupName + "/disable-clusterip-routing" ) var ( diff --git a/pkg/autoscaler/metrics/stat.pb.go b/pkg/autoscaler/metrics/stat.pb.go index 99df6f71699d..58024c8ed1a5 100644 --- a/pkg/autoscaler/metrics/stat.pb.go +++ b/pkg/autoscaler/metrics/stat.pb.go @@ -22,10 +22,11 @@ package metrics import ( encoding_binary "encoding/binary" fmt "fmt" - proto "github.com/gogo/protobuf/proto" io "io" math "math" math_bits "math/bits" + + proto "github.com/gogo/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. diff --git a/test/test_images/grpc-ping/proto/ping.pb.go b/test/test_images/grpc-ping/proto/ping.pb.go index f1cf30f9c35a..5dedf9ffbf02 100644 --- a/test/test_images/grpc-ping/proto/ping.pb.go +++ b/test/test_images/grpc-ping/proto/ping.pb.go @@ -22,13 +22,14 @@ package ping import ( context "context" fmt "fmt" + io "io" + math "math" + math_bits "math/bits" + proto "github.com/gogo/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" - io "io" - math "math" - math_bits "math/bits" ) // Reference imports to suppress errors if they are not otherwise used.