Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dest): Allow destination controller to force opaque transport #13699

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions controller/api/destination/endpoint_profile_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
)

type endpointProfileTranslator struct {
enableH2Upgrade bool
forceOpaqueTransport,
enableH2Upgrade bool
controllerNS string
identityTrustDomain string
defaultOpaquePorts map[uint32]struct{}
Expand Down Expand Up @@ -44,6 +45,7 @@ var endpointProfileUpdatesQueueOverflowCounter = promauto.NewCounter(
// newEndpointProfileTranslator translates pod updates and profile updates to
// DestinationProfiles for endpoints
func newEndpointProfileTranslator(
forceOpaqueTransport bool,
enableH2Upgrade bool,
controllerNS,
identityTrustDomain string,
Expand All @@ -54,10 +56,11 @@ func newEndpointProfileTranslator(
log *logging.Entry,
) *endpointProfileTranslator {
return &endpointProfileTranslator{
enableH2Upgrade: enableH2Upgrade,
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
defaultOpaquePorts: defaultOpaquePorts,
forceOpaqueTransport: forceOpaqueTransport,
enableH2Upgrade: enableH2Upgrade,
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
defaultOpaquePorts: defaultOpaquePorts,

meshedHttp2ClientParams: meshedHTTP2ClientParams,

Expand Down Expand Up @@ -158,10 +161,10 @@ func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, op
var weightedAddr *pb.WeightedAddr
var err error
if address.ExternalWorkload != nil {
weightedAddr, err = createWeightedAddrForExternalWorkload(address, opaquePorts, ept.meshedHttp2ClientParams)
weightedAddr, err = createWeightedAddrForExternalWorkload(address, ept.forceOpaqueTransport, opaquePorts, ept.meshedHttp2ClientParams)
} else {
weightedAddr, err = createWeightedAddr(address, opaquePorts,
ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS, ept.meshedHttp2ClientParams)
ept.forceOpaqueTransport, ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS, ept.meshedHttp2ClientParams)
}
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestEndpointProfileTranslator(t *testing.T) {
}
log := logging.WithField("test", t.Name())
translator := newEndpointProfileTranslator(
true, "cluster", "identity", make(map[uint32]struct{}), nil,
false, true, "cluster", "identity", make(map[uint32]struct{}), nil,
mockGetProfileServer,
nil,
log,
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestEndpointProfileTranslator(t *testing.T) {
log := logging.WithField("test", t.Name())
endStream := make(chan struct{})
translator := newEndpointProfileTranslator(
true, "cluster", "identity", make(map[uint32]struct{}), nil,
false, true, "cluster", "identity", make(map[uint32]struct{}), nil,
mockGetProfileServer,
endStream,
log,
Expand Down
49 changes: 29 additions & 20 deletions controller/api/destination/endpoint_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type (
nodeName string
defaultOpaquePorts map[uint32]struct{}

forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
Expand Down Expand Up @@ -83,6 +84,7 @@ var updatesQueueOverflowCounter = promauto.NewCounterVec(
func newEndpointTranslator(
controllerNS string,
identityTrustDomain string,
forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
Expand Down Expand Up @@ -115,6 +117,7 @@ func newEndpointTranslator(
nodeTopologyZone,
srcNodeName,
defaultOpaquePorts,
forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
Expand Down Expand Up @@ -409,14 +412,14 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
if address.Pod != nil {
opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts)
wa, err = createWeightedAddr(address, opaquePorts,
et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.meshedHTTP2ClientParams)
et.forceOpaqueTransport, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.meshedHTTP2ClientParams)
if err != nil {
et.log.Errorf("Failed to translate Pod endpoints to weighted addr: %s", err)
continue
}
} else if address.ExternalWorkload != nil {
opaquePorts = watcher.GetAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, et.defaultOpaquePorts)
wa, err = createWeightedAddrForExternalWorkload(address, opaquePorts, et.meshedHTTP2ClientParams)
wa, err = createWeightedAddrForExternalWorkload(address, et.forceOpaqueTransport, opaquePorts, et.meshedHTTP2ClientParams)
if err != nil {
et.log.Errorf("Failed to translate ExternalWorkload endpoints to weighted addr: %s", err)
continue
Expand Down Expand Up @@ -531,6 +534,7 @@ func toAddr(address watcher.Address) (*net.TcpAddress, error) {

func createWeightedAddrForExternalWorkload(
address watcher.Address,
forceOpaqueTransport bool,
opaquePorts map[uint32]struct{},
http2 *pb.Http2ClientParams,
) (*pb.WeightedAddr, error) {
Expand All @@ -556,21 +560,23 @@ func createWeightedAddrForExternalWorkload(
weightedAddr.Http2 = http2

_, opaquePort := opaquePorts[address.Port]
opaquePort = opaquePort || address.OpaqueProtocol

if forceOpaqueTransport || opaquePort {
port, err := getInboundPortFromExternalWorkload(&address.ExternalWorkload.Spec)
if err != nil {
return nil, fmt.Errorf("failed to read inbound port from external workload: %w", err)
}
weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{InboundPort: port}
}

// If address is set as opaque by a Server, or its port is set as
// opaque by annotation or default value, then set the hinted protocol to
// Opaque.
if address.OpaqueProtocol || opaquePort {
if opaquePort {
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
Opaque: &pb.ProtocolHint_Opaque{},
}

port, err := getInboundPortFromExternalWorkload(&address.ExternalWorkload.Spec)
if err != nil {
return nil, fmt.Errorf("failed to read inbound port: %w", err)
}
weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
InboundPort: port,
}
} else {
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_H2_{
H2: &pb.ProtocolHint_H2{},
Expand Down Expand Up @@ -603,6 +609,7 @@ func createWeightedAddrForExternalWorkload(
func createWeightedAddr(
address watcher.Address,
opaquePorts map[uint32]struct{},
forceOpaqueTransport bool,
enableH2Upgrade bool,
identityTrustDomain string,
controllerNS string,
Expand Down Expand Up @@ -645,20 +652,22 @@ func createWeightedAddr(
weightedAddr.ProtocolHint = &pb.ProtocolHint{}

_, opaquePort := opaquePorts[address.Port]
// If address is set as opaque by a Server, or its port is set as
// opaque by annotation or default value, then set the hinted protocol to
// Opaque.
if address.OpaqueProtocol || opaquePort {
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
Opaque: &pb.ProtocolHint_Opaque{},
}
opaquePort = opaquePort || address.OpaqueProtocol

if forceOpaqueTransport || opaquePort {
port, err := getInboundPort(&address.Pod.Spec)
if err != nil {
return nil, fmt.Errorf("failed to read inbound port: %w", err)
}
weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
InboundPort: port,
weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{InboundPort: port}
}

// If address is set as opaque by a Server, or its port is set as
// opaque by annotation or default value, then set the hinted protocol to
// Opaque.
if opaquePort {
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
Opaque: &pb.ProtocolHint_Opaque{},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (and above for external workloads) are the meat and potatoes of this change, with the rest being config wiring.

}
} else if enableH2Upgrade {
// If the pod is controlled by any Linkerd control plane, then it can be
Expand Down
2 changes: 2 additions & 0 deletions controller/api/destination/federated_service_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func (fs *federatedService) remoteDiscoverySubscribe(
translator := newEndpointTranslator(
fs.config.ControllerNS,
remoteConfig.TrustDomain,
fs.config.ForceOpaqueTransport,
fs.config.EnableH2Upgrade,
false, // Disable endpoint filtering for remote discovery.
fs.config.EnableIPv6,
Expand Down Expand Up @@ -399,6 +400,7 @@ func (fs *federatedService) localDiscoverySubscribe(
translator := newEndpointTranslator(
fs.config.ControllerNS,
fs.config.IdentityTrustDomain,
fs.config.ForceOpaqueTransport,
fs.config.EnableH2Upgrade,
true,
fs.config.EnableIPv6,
Expand Down
4 changes: 4 additions & 0 deletions controller/api/destination/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type (
IdentityTrustDomain,
ClusterDomain string

ForceOpaqueTransport,
EnableH2Upgrade,
EnableEndpointSlices,
EnableIPv6,
Expand Down Expand Up @@ -205,6 +206,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
translator := newEndpointTranslator(
s.config.ControllerNS,
remoteConfig.TrustDomain,
s.config.ForceOpaqueTransport,
s.config.EnableH2Upgrade,
false, // Disable endpoint filtering for remote discovery.
s.config.EnableIPv6,
Expand Down Expand Up @@ -239,6 +241,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
translator := newEndpointTranslator(
s.config.ControllerNS,
s.config.IdentityTrustDomain,
s.config.ForceOpaqueTransport,
s.config.EnableH2Upgrade,
true,
s.config.EnableIPv6,
Expand Down Expand Up @@ -531,6 +534,7 @@ func (s *server) subscribeToEndpointProfile(
canceled := stream.Context().Done()
streamEnd := make(chan struct{})
translator := newEndpointProfileTranslator(
s.config.ForceOpaqueTransport,
s.config.EnableH2Upgrade,
s.config.ControllerNS,
s.config.IdentityTrustDomain,
Expand Down
5 changes: 3 additions & 2 deletions controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,9 +1072,10 @@ metadata:
translator := newEndpointTranslator(
"linkerd",
"trust.domain",
true,
true,
false, // forceOpaqueTransport
true, // enableH2Upgrade
true, // enableEndpointFiltering
true, // enableIPv6
false, // extEndpointZoneWeights
nil, // meshedHttp2ClientParams
"service-name.service-ns",
Expand Down
3 changes: 3 additions & 0 deletions controller/cmd/destination/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func Main(args []string) {
metricsAddr := cmd.String("metrics-addr", ":9996", "address to serve scrapable metrics on")
kubeConfigPath := cmd.String("kubeconfig", "", "path to kube config")
controllerNamespace := cmd.String("controller-namespace", "linkerd", "namespace in which Linkerd is installed")
forceOpaqueTransport := cmd.Bool("force-opaque-transport", false,
"Force proxies to route all outbound meshed traffic to the proxy's default inbound port")
Comment on lines +35 to +36
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to reframe this a bit as far as the external- (and especially user-) facing interface is concerned. It's not critical to address in the command line flag; though it is probably generally better for the terms to be consistent between the implementation and the helm chart...

https://github.com/BuoyantIO/buoyant-linkerd2/blob/eddd3f6e7812b82df24d899039c904fe8a89fb3a/charts/linkerd-control-plane/templates/destination.yaml#L199-L209

We could frame this as an "outbound TLS mode", with values of:

  • legacy-transparent -- the old behavior: transparently implement mTLS on connections
  • transport-protocol -- the new behavior: use the transport protocol to route traffic

in that case, we could call this flag something like force-legacy-outbound-tls-mode.

enableH2Upgrade := cmd.Bool("enable-h2-upgrade", true,
"Enable transparently upgraded HTTP2 connections among pods in the service mesh")
enableEndpointSlices := cmd.Bool("enable-endpoint-slices", true,
Expand Down Expand Up @@ -171,6 +173,7 @@ func Main(args []string) {
IdentityTrustDomain: *trustDomain,
ClusterDomain: *clusterDomain,
DefaultOpaquePorts: opaquePorts,
ForceOpaqueTransport: *forceOpaqueTransport,
EnableH2Upgrade: *enableH2Upgrade,
EnableEndpointSlices: *enableEndpointSlices,
EnableIPv6: *enableIPv6,
Expand Down
Loading