Skip to content

Commit

Permalink
feat(dest): Allow destination controller to force opaque transport
Browse files Browse the repository at this point in the history
Non-opaque meshed traffic currently flows over the original destination port, which requires the inbound proxy to do protocol detection.

This adds an option to the destination controller that configures all meshed traffic to flow to the inbound proxy's inbound port. This will allow us to include more session protocol information in the future, obviating the need for inbound protocol detection.

This doesn't do much in the way of testing, since the default behavior should be unchanged. When this default changes, more validation will be done on the behavior here.

Signed-off-by: Scott Fleener <[email protected]>
  • Loading branch information
sfleen committed Feb 21, 2025
1 parent fe1416a commit 331ef1e
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 31 deletions.
17 changes: 10 additions & 7 deletions controller/api/destination/endpoint_profile_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
)

type endpointProfileTranslator struct {
enableH2Upgrade bool
forceOpaqueTransport,
enableH2Upgrade bool
controllerNS string
identityTrustDomain string
defaultOpaquePorts map[uint32]struct{}
Expand Down Expand Up @@ -44,6 +45,7 @@ var endpointProfileUpdatesQueueOverflowCounter = promauto.NewCounter(
// newEndpointProfileTranslator translates pod updates and profile updates to
// DestinationProfiles for endpoints
func newEndpointProfileTranslator(
forceOpaqueTransport bool,
enableH2Upgrade bool,
controllerNS,
identityTrustDomain string,
Expand All @@ -54,10 +56,11 @@ func newEndpointProfileTranslator(
log *logging.Entry,
) *endpointProfileTranslator {
return &endpointProfileTranslator{
enableH2Upgrade: enableH2Upgrade,
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
defaultOpaquePorts: defaultOpaquePorts,
forceOpaqueTransport: forceOpaqueTransport,
enableH2Upgrade: enableH2Upgrade,
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
defaultOpaquePorts: defaultOpaquePorts,

meshedHttp2ClientParams: meshedHTTP2ClientParams,

Expand Down Expand Up @@ -158,10 +161,10 @@ func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, op
var weightedAddr *pb.WeightedAddr
var err error
if address.ExternalWorkload != nil {
weightedAddr, err = createWeightedAddrForExternalWorkload(address, opaquePorts, ept.meshedHttp2ClientParams)
weightedAddr, err = createWeightedAddrForExternalWorkload(address, ept.forceOpaqueTransport, opaquePorts, ept.meshedHttp2ClientParams)
} else {
weightedAddr, err = createWeightedAddr(address, opaquePorts,
ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS, ept.meshedHttp2ClientParams)
ept.forceOpaqueTransport, ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS, ept.meshedHttp2ClientParams)
}
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestEndpointProfileTranslator(t *testing.T) {
}
log := logging.WithField("test", t.Name())
translator := newEndpointProfileTranslator(
true, "cluster", "identity", make(map[uint32]struct{}), nil,
false, true, "cluster", "identity", make(map[uint32]struct{}), nil,
mockGetProfileServer,
nil,
log,
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestEndpointProfileTranslator(t *testing.T) {
log := logging.WithField("test", t.Name())
endStream := make(chan struct{})
translator := newEndpointProfileTranslator(
true, "cluster", "identity", make(map[uint32]struct{}), nil,
false, true, "cluster", "identity", make(map[uint32]struct{}), nil,
mockGetProfileServer,
endStream,
log,
Expand Down
49 changes: 29 additions & 20 deletions controller/api/destination/endpoint_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type (
nodeName string
defaultOpaquePorts map[uint32]struct{}

forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
Expand Down Expand Up @@ -83,6 +84,7 @@ var updatesQueueOverflowCounter = promauto.NewCounterVec(
func newEndpointTranslator(
controllerNS string,
identityTrustDomain string,
forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
Expand Down Expand Up @@ -115,6 +117,7 @@ func newEndpointTranslator(
nodeTopologyZone,
srcNodeName,
defaultOpaquePorts,
forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
Expand Down Expand Up @@ -409,14 +412,14 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
if address.Pod != nil {
opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts)
wa, err = createWeightedAddr(address, opaquePorts,
et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.meshedHTTP2ClientParams)
et.forceOpaqueTransport, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.meshedHTTP2ClientParams)
if err != nil {
et.log.Errorf("Failed to translate Pod endpoints to weighted addr: %s", err)
continue
}
} else if address.ExternalWorkload != nil {
opaquePorts = watcher.GetAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, et.defaultOpaquePorts)
wa, err = createWeightedAddrForExternalWorkload(address, opaquePorts, et.meshedHTTP2ClientParams)
wa, err = createWeightedAddrForExternalWorkload(address, et.forceOpaqueTransport, opaquePorts, et.meshedHTTP2ClientParams)
if err != nil {
et.log.Errorf("Failed to translate ExternalWorkload endpoints to weighted addr: %s", err)
continue
Expand Down Expand Up @@ -531,6 +534,7 @@ func toAddr(address watcher.Address) (*net.TcpAddress, error) {

func createWeightedAddrForExternalWorkload(
address watcher.Address,
forceOpaqueTransport bool,
opaquePorts map[uint32]struct{},
http2 *pb.Http2ClientParams,
) (*pb.WeightedAddr, error) {
Expand All @@ -556,21 +560,23 @@ func createWeightedAddrForExternalWorkload(
weightedAddr.Http2 = http2

_, opaquePort := opaquePorts[address.Port]
opaquePort = opaquePort || address.OpaqueProtocol

if forceOpaqueTransport || opaquePort {
port, err := getInboundPortFromExternalWorkload(&address.ExternalWorkload.Spec)
if err != nil {
return nil, fmt.Errorf("failed to read inbound port from external workload: %w", err)
}
weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{InboundPort: port}
}

// If address is set as opaque by a Server, or its port is set as
// opaque by annotation or default value, then set the hinted protocol to
// Opaque.
if address.OpaqueProtocol || opaquePort {
if opaquePort {
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
Opaque: &pb.ProtocolHint_Opaque{},
}

port, err := getInboundPortFromExternalWorkload(&address.ExternalWorkload.Spec)
if err != nil {
return nil, fmt.Errorf("failed to read inbound port: %w", err)
}
weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
InboundPort: port,
}
} else {
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_H2_{
H2: &pb.ProtocolHint_H2{},
Expand Down Expand Up @@ -603,6 +609,7 @@ func createWeightedAddrForExternalWorkload(
func createWeightedAddr(
address watcher.Address,
opaquePorts map[uint32]struct{},
forceOpaqueTransport bool,
enableH2Upgrade bool,
identityTrustDomain string,
controllerNS string,
Expand Down Expand Up @@ -645,20 +652,22 @@ func createWeightedAddr(
weightedAddr.ProtocolHint = &pb.ProtocolHint{}

_, opaquePort := opaquePorts[address.Port]
// If address is set as opaque by a Server, or its port is set as
// opaque by annotation or default value, then set the hinted protocol to
// Opaque.
if address.OpaqueProtocol || opaquePort {
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
Opaque: &pb.ProtocolHint_Opaque{},
}
opaquePort = opaquePort || address.OpaqueProtocol

if forceOpaqueTransport || opaquePort {
port, err := getInboundPort(&address.Pod.Spec)
if err != nil {
return nil, fmt.Errorf("failed to read inbound port: %w", err)
}
weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
InboundPort: port,
weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{InboundPort: port}
}

// If address is set as opaque by a Server, or its port is set as
// opaque by annotation or default value, then set the hinted protocol to
// Opaque.
if opaquePort {
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
Opaque: &pb.ProtocolHint_Opaque{},
}
} else if enableH2Upgrade {
// If the pod is controlled by any Linkerd control plane, then it can be
Expand Down
2 changes: 2 additions & 0 deletions controller/api/destination/federated_service_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func (fs *federatedService) remoteDiscoverySubscribe(
translator := newEndpointTranslator(
fs.config.ControllerNS,
remoteConfig.TrustDomain,
fs.config.ForceOpaqueTransport,
fs.config.EnableH2Upgrade,
false, // Disable endpoint filtering for remote discovery.
fs.config.EnableIPv6,
Expand Down Expand Up @@ -399,6 +400,7 @@ func (fs *federatedService) localDiscoverySubscribe(
translator := newEndpointTranslator(
fs.config.ControllerNS,
fs.config.IdentityTrustDomain,
fs.config.ForceOpaqueTransport,
fs.config.EnableH2Upgrade,
true,
fs.config.EnableIPv6,
Expand Down
4 changes: 4 additions & 0 deletions controller/api/destination/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type (
IdentityTrustDomain,
ClusterDomain string

ForceOpaqueTransport,
EnableH2Upgrade,
EnableEndpointSlices,
EnableIPv6,
Expand Down Expand Up @@ -205,6 +206,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
translator := newEndpointTranslator(
s.config.ControllerNS,
remoteConfig.TrustDomain,
s.config.ForceOpaqueTransport,
s.config.EnableH2Upgrade,
false, // Disable endpoint filtering for remote discovery.
s.config.EnableIPv6,
Expand Down Expand Up @@ -239,6 +241,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
translator := newEndpointTranslator(
s.config.ControllerNS,
s.config.IdentityTrustDomain,
s.config.ForceOpaqueTransport,
s.config.EnableH2Upgrade,
true,
s.config.EnableIPv6,
Expand Down Expand Up @@ -531,6 +534,7 @@ func (s *server) subscribeToEndpointProfile(
canceled := stream.Context().Done()
streamEnd := make(chan struct{})
translator := newEndpointProfileTranslator(
s.config.ForceOpaqueTransport,
s.config.EnableH2Upgrade,
s.config.ControllerNS,
s.config.IdentityTrustDomain,
Expand Down
5 changes: 3 additions & 2 deletions controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,9 +1072,10 @@ metadata:
translator := newEndpointTranslator(
"linkerd",
"trust.domain",
true,
true,
false, // forceOpaqueTransport
true, // enableH2Upgrade
true, // enableEndpointFiltering
true, // enableIPv6
false, // extEndpointZoneWeights
nil, // meshedHttp2ClientParams
"service-name.service-ns",
Expand Down
3 changes: 3 additions & 0 deletions controller/cmd/destination/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func Main(args []string) {
metricsAddr := cmd.String("metrics-addr", ":9996", "address to serve scrapable metrics on")
kubeConfigPath := cmd.String("kubeconfig", "", "path to kube config")
controllerNamespace := cmd.String("controller-namespace", "linkerd", "namespace in which Linkerd is installed")
forceOpaqueTransport := cmd.Bool("force-opaque-transport", false,
"Force proxies to route all outbound meshed traffic to the proxy's default inbound port")
enableH2Upgrade := cmd.Bool("enable-h2-upgrade", true,
"Enable transparently upgraded HTTP2 connections among pods in the service mesh")
enableEndpointSlices := cmd.Bool("enable-endpoint-slices", true,
Expand Down Expand Up @@ -171,6 +173,7 @@ func Main(args []string) {
IdentityTrustDomain: *trustDomain,
ClusterDomain: *clusterDomain,
DefaultOpaquePorts: opaquePorts,
ForceOpaqueTransport: *forceOpaqueTransport,
EnableH2Upgrade: *enableH2Upgrade,
EnableEndpointSlices: *enableEndpointSlices,
EnableIPv6: *enableIPv6,
Expand Down

0 comments on commit 331ef1e

Please sign in to comment.