diff --git a/controller/api/destination/endpoint_profile_translator.go b/controller/api/destination/endpoint_profile_translator.go index c5aeef331c151..f651f3cafc290 100644 --- a/controller/api/destination/endpoint_profile_translator.go +++ b/controller/api/destination/endpoint_profile_translator.go @@ -12,7 +12,8 @@ import ( ) type endpointProfileTranslator struct { - enableH2Upgrade bool + forceOpaqueTransport, + enableH2Upgrade bool controllerNS string identityTrustDomain string defaultOpaquePorts map[uint32]struct{} @@ -44,6 +45,7 @@ var endpointProfileUpdatesQueueOverflowCounter = promauto.NewCounter( // newEndpointProfileTranslator translates pod updates and profile updates to // DestinationProfiles for endpoints func newEndpointProfileTranslator( + forceOpaqueTransport bool, enableH2Upgrade bool, controllerNS, identityTrustDomain string, @@ -54,10 +56,11 @@ func newEndpointProfileTranslator( log *logging.Entry, ) *endpointProfileTranslator { return &endpointProfileTranslator{ - enableH2Upgrade: enableH2Upgrade, - controllerNS: controllerNS, - identityTrustDomain: identityTrustDomain, - defaultOpaquePorts: defaultOpaquePorts, + forceOpaqueTransport: forceOpaqueTransport, + enableH2Upgrade: enableH2Upgrade, + controllerNS: controllerNS, + identityTrustDomain: identityTrustDomain, + defaultOpaquePorts: defaultOpaquePorts, meshedHttp2ClientParams: meshedHTTP2ClientParams, @@ -158,10 +161,10 @@ func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, op var weightedAddr *pb.WeightedAddr var err error if address.ExternalWorkload != nil { - weightedAddr, err = createWeightedAddrForExternalWorkload(address, opaquePorts, ept.meshedHttp2ClientParams) + weightedAddr, err = createWeightedAddrForExternalWorkload(address, ept.forceOpaqueTransport, opaquePorts, ept.meshedHttp2ClientParams) } else { weightedAddr, err = createWeightedAddr(address, opaquePorts, - ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS, ept.meshedHttp2ClientParams) + ept.forceOpaqueTransport, ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS, ept.meshedHttp2ClientParams) } if err != nil { return nil, err diff --git a/controller/api/destination/endpoint_profile_translator_test.go b/controller/api/destination/endpoint_profile_translator_test.go index 5007e4a61812b..3d4e0b2964529 100644 --- a/controller/api/destination/endpoint_profile_translator_test.go +++ b/controller/api/destination/endpoint_profile_translator_test.go @@ -40,7 +40,7 @@ func TestEndpointProfileTranslator(t *testing.T) { } log := logging.WithField("test", t.Name()) translator := newEndpointProfileTranslator( - true, "cluster", "identity", make(map[uint32]struct{}), nil, + false, true, "cluster", "identity", make(map[uint32]struct{}), nil, mockGetProfileServer, nil, log, @@ -84,7 +84,7 @@ func TestEndpointProfileTranslator(t *testing.T) { log := logging.WithField("test", t.Name()) endStream := make(chan struct{}) translator := newEndpointProfileTranslator( - true, "cluster", "identity", make(map[uint32]struct{}), nil, + false, true, "cluster", "identity", make(map[uint32]struct{}), nil, mockGetProfileServer, endStream, log, diff --git a/controller/api/destination/endpoint_translator.go b/controller/api/destination/endpoint_translator.go index 628480632f708..b88fc8fe32a40 100644 --- a/controller/api/destination/endpoint_translator.go +++ b/controller/api/destination/endpoint_translator.go @@ -38,6 +38,7 @@ type ( nodeName string defaultOpaquePorts map[uint32]struct{} + forceOpaqueTransport, enableH2Upgrade, enableEndpointFiltering, enableIPv6, @@ -83,6 +84,7 @@ var updatesQueueOverflowCounter = promauto.NewCounterVec( func newEndpointTranslator( controllerNS string, identityTrustDomain string, + forceOpaqueTransport, enableH2Upgrade, enableEndpointFiltering, enableIPv6, @@ -115,6 +117,7 @@ func newEndpointTranslator( nodeTopologyZone, srcNodeName, defaultOpaquePorts, + forceOpaqueTransport, enableH2Upgrade, enableEndpointFiltering, enableIPv6, @@ -409,14 +412,14 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) { if address.Pod != nil { opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts) wa, err = createWeightedAddr(address, opaquePorts, - et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.meshedHTTP2ClientParams) + et.forceOpaqueTransport, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.meshedHTTP2ClientParams) if err != nil { et.log.Errorf("Failed to translate Pod endpoints to weighted addr: %s", err) continue } } else if address.ExternalWorkload != nil { opaquePorts = watcher.GetAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, et.defaultOpaquePorts) - wa, err = createWeightedAddrForExternalWorkload(address, opaquePorts, et.meshedHTTP2ClientParams) + wa, err = createWeightedAddrForExternalWorkload(address, et.forceOpaqueTransport, opaquePorts, et.meshedHTTP2ClientParams) if err != nil { et.log.Errorf("Failed to translate ExternalWorkload endpoints to weighted addr: %s", err) continue @@ -531,6 +534,7 @@ func toAddr(address watcher.Address) (*net.TcpAddress, error) { func createWeightedAddrForExternalWorkload( address watcher.Address, + forceOpaqueTransport bool, opaquePorts map[uint32]struct{}, http2 *pb.Http2ClientParams, ) (*pb.WeightedAddr, error) { @@ -556,21 +560,23 @@ func createWeightedAddrForExternalWorkload( weightedAddr.Http2 = http2 _, opaquePort := opaquePorts[address.Port] + opaquePort = opaquePort || address.OpaqueProtocol + + if forceOpaqueTransport || opaquePort { + port, err := getInboundPortFromExternalWorkload(&address.ExternalWorkload.Spec) + if err != nil { + return nil, fmt.Errorf("failed to read inbound port from external workload: %w", err) + } + weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{InboundPort: port} + } + // If address is set as opaque by a Server, or its port is set as // opaque by annotation or default value, then set the hinted protocol to // Opaque. - if address.OpaqueProtocol || opaquePort { + if opaquePort { weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{ Opaque: &pb.ProtocolHint_Opaque{}, } - - port, err := getInboundPortFromExternalWorkload(&address.ExternalWorkload.Spec) - if err != nil { - return nil, fmt.Errorf("failed to read inbound port: %w", err) - } - weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{ - InboundPort: port, - } } else { weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_H2_{ H2: &pb.ProtocolHint_H2{}, @@ -603,6 +609,7 @@ func createWeightedAddrForExternalWorkload( func createWeightedAddr( address watcher.Address, opaquePorts map[uint32]struct{}, + forceOpaqueTransport bool, enableH2Upgrade bool, identityTrustDomain string, controllerNS string, @@ -645,20 +652,22 @@ func createWeightedAddr( weightedAddr.ProtocolHint = &pb.ProtocolHint{} _, opaquePort := opaquePorts[address.Port] - // If address is set as opaque by a Server, or its port is set as - // opaque by annotation or default value, then set the hinted protocol to - // Opaque. - if address.OpaqueProtocol || opaquePort { - weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{ - Opaque: &pb.ProtocolHint_Opaque{}, - } + opaquePort = opaquePort || address.OpaqueProtocol + if forceOpaqueTransport || opaquePort { port, err := getInboundPort(&address.Pod.Spec) if err != nil { return nil, fmt.Errorf("failed to read inbound port: %w", err) } - weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{ - InboundPort: port, + weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{InboundPort: port} + } + + // If address is set as opaque by a Server, or its port is set as + // opaque by annotation or default value, then set the hinted protocol to + // Opaque. + if opaquePort { + weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{ + Opaque: &pb.ProtocolHint_Opaque{}, } } else if enableH2Upgrade { // If the pod is controlled by any Linkerd control plane, then it can be diff --git a/controller/api/destination/federated_service_watcher.go b/controller/api/destination/federated_service_watcher.go index 4cc356a05959d..5c385f26c8fba 100644 --- a/controller/api/destination/federated_service_watcher.go +++ b/controller/api/destination/federated_service_watcher.go @@ -351,6 +351,7 @@ func (fs *federatedService) remoteDiscoverySubscribe( translator := newEndpointTranslator( fs.config.ControllerNS, remoteConfig.TrustDomain, + fs.config.ForceOpaqueTransport, fs.config.EnableH2Upgrade, false, // Disable endpoint filtering for remote discovery. fs.config.EnableIPv6, @@ -399,6 +400,7 @@ func (fs *federatedService) localDiscoverySubscribe( translator := newEndpointTranslator( fs.config.ControllerNS, fs.config.IdentityTrustDomain, + fs.config.ForceOpaqueTransport, fs.config.EnableH2Upgrade, true, fs.config.EnableIPv6, diff --git a/controller/api/destination/server.go b/controller/api/destination/server.go index 637aeafcbc557..496ef003fc2af 100644 --- a/controller/api/destination/server.go +++ b/controller/api/destination/server.go @@ -30,6 +30,7 @@ type ( IdentityTrustDomain, ClusterDomain string + ForceOpaqueTransport, EnableH2Upgrade, EnableEndpointSlices, EnableIPv6, @@ -205,6 +206,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e translator := newEndpointTranslator( s.config.ControllerNS, remoteConfig.TrustDomain, + s.config.ForceOpaqueTransport, s.config.EnableH2Upgrade, false, // Disable endpoint filtering for remote discovery. s.config.EnableIPv6, @@ -239,6 +241,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e translator := newEndpointTranslator( s.config.ControllerNS, s.config.IdentityTrustDomain, + s.config.ForceOpaqueTransport, s.config.EnableH2Upgrade, true, s.config.EnableIPv6, @@ -531,6 +534,7 @@ func (s *server) subscribeToEndpointProfile( canceled := stream.Context().Done() streamEnd := make(chan struct{}) translator := newEndpointProfileTranslator( + s.config.ForceOpaqueTransport, s.config.EnableH2Upgrade, s.config.ControllerNS, s.config.IdentityTrustDomain, diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index e8e7573e0170d..0f3beff1062c4 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -1072,9 +1072,10 @@ metadata: translator := newEndpointTranslator( "linkerd", "trust.domain", - true, - true, + false, // forceOpaqueTransport + true, // enableH2Upgrade true, // enableEndpointFiltering + true, // enableIPv6 false, // extEndpointZoneWeights nil, // meshedHttp2ClientParams "service-name.service-ns", diff --git a/controller/cmd/destination/main.go b/controller/cmd/destination/main.go index 5532ee8aec374..2d6e4bde91fcb 100644 --- a/controller/cmd/destination/main.go +++ b/controller/cmd/destination/main.go @@ -32,6 +32,8 @@ func Main(args []string) { metricsAddr := cmd.String("metrics-addr", ":9996", "address to serve scrapable metrics on") kubeConfigPath := cmd.String("kubeconfig", "", "path to kube config") controllerNamespace := cmd.String("controller-namespace", "linkerd", "namespace in which Linkerd is installed") + forceOpaqueTransport := cmd.Bool("force-opaque-transport", false, + "Force proxies to route all outbound meshed traffic to the proxy's default inbound port") enableH2Upgrade := cmd.Bool("enable-h2-upgrade", true, "Enable transparently upgraded HTTP2 connections among pods in the service mesh") enableEndpointSlices := cmd.Bool("enable-endpoint-slices", true, @@ -171,6 +173,7 @@ func Main(args []string) { IdentityTrustDomain: *trustDomain, ClusterDomain: *clusterDomain, DefaultOpaquePorts: opaquePorts, + ForceOpaqueTransport: *forceOpaqueTransport, EnableH2Upgrade: *enableH2Upgrade, EnableEndpointSlices: *enableEndpointSlices, EnableIPv6: *enableIPv6,