From 20301d1fb6f58763e52bd92aee9e22052456f2fa Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Thu, 6 Jul 2023 21:22:07 -0700 Subject: [PATCH] StreamTranslator and FallbackExecutor for WebSockets Kubernetes-commit: 168998e87bfd49a1b0bc6402761fafd5ace3bb3b --- pkg/util/httpstream/httpstream.go | 21 +++ pkg/util/httpstream/httpstream_test.go | 39 +++++ pkg/util/httpstream/spdy/roundtripper.go | 55 +++++-- pkg/util/httpstream/spdy/roundtripper_test.go | 85 +++++++++- pkg/util/httpstream/wsstream/conn.go | 77 ++++++++-- pkg/util/httpstream/wsstream/conn_test.go | 145 ++++++++++++++++++ pkg/util/proxy/dial.go | 4 +- pkg/util/proxy/dial_test.go | 2 +- pkg/util/proxy/upgradeaware.go | 2 +- 9 files changed, 401 insertions(+), 29 deletions(-) diff --git a/pkg/util/httpstream/httpstream.go b/pkg/util/httpstream/httpstream.go index 32f075782..a32fce5a0 100644 --- a/pkg/util/httpstream/httpstream.go +++ b/pkg/util/httpstream/httpstream.go @@ -17,6 +17,7 @@ limitations under the License. package httpstream import ( + "errors" "fmt" "io" "net/http" @@ -95,6 +96,26 @@ type Stream interface { Identifier() uint32 } +// UpgradeFailureError encapsulates the cause for why the streaming +// upgrade request failed. Implements error interface. +type UpgradeFailureError struct { + Cause error +} + +func (u *UpgradeFailureError) Error() string { + return fmt.Sprintf("unable to upgrade streaming request: %s", u.Cause) +} + +// IsUpgradeFailure returns true if the passed error is (or wrapped error contains) +// the UpgradeFailureError. +func IsUpgradeFailure(err error) bool { + if err == nil { + return false + } + var upgradeErr *UpgradeFailureError + return errors.As(err, &upgradeErr) +} + // IsUpgradeRequest returns true if the given request is a connection upgrade request func IsUpgradeRequest(req *http.Request) bool { for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] { diff --git a/pkg/util/httpstream/httpstream_test.go b/pkg/util/httpstream/httpstream_test.go index e988bce2b..11fb92863 100644 --- a/pkg/util/httpstream/httpstream_test.go +++ b/pkg/util/httpstream/httpstream_test.go @@ -17,6 +17,8 @@ limitations under the License. package httpstream import ( + "errors" + "fmt" "net/http" "reflect" "testing" @@ -129,3 +131,40 @@ func TestHandshake(t *testing.T) { } } } + +func TestIsUpgradeFailureError(t *testing.T) { + testCases := map[string]struct { + err error + expected bool + }{ + "nil error should return false": { + err: nil, + expected: false, + }, + "Non-upgrade error should return false": { + err: fmt.Errorf("this is not an upgrade error"), + expected: false, + }, + "UpgradeFailure error should return true": { + err: &UpgradeFailureError{}, + expected: true, + }, + "Wrapped Non-UpgradeFailure error should return false": { + err: fmt.Errorf("%s: %w", "first error", errors.New("Non-upgrade error")), + expected: false, + }, + "Wrapped UpgradeFailure error should return true": { + err: fmt.Errorf("%s: %w", "first error", &UpgradeFailureError{}), + expected: true, + }, + } + + for name, test := range testCases { + t.Run(name, func(t *testing.T) { + actual := IsUpgradeFailure(test.err) + if test.expected != actual { + t.Errorf("expected upgrade failure %t, got %t", test.expected, actual) + } + }) + } +} diff --git a/pkg/util/httpstream/spdy/roundtripper.go b/pkg/util/httpstream/spdy/roundtripper.go index 7fe52ee56..c78326fa3 100644 --- a/pkg/util/httpstream/spdy/roundtripper.go +++ b/pkg/util/httpstream/spdy/roundtripper.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/httpstream" utilnet "k8s.io/apimachinery/pkg/util/net" + apiproxy "k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apimachinery/third_party/forked/golang/netutil" ) @@ -68,6 +69,10 @@ type SpdyRoundTripper struct { // pingPeriod is a period for sending Ping frames over established // connections. pingPeriod time.Duration + + // upgradeTransport is an optional substitute for dialing if present. This field is + // mutually exclusive with the "tlsConfig", "Dialer", and "proxier". + upgradeTransport http.RoundTripper } var _ utilnet.TLSClientConfigHolder = &SpdyRoundTripper{} @@ -76,43 +81,61 @@ var _ utilnet.Dialer = &SpdyRoundTripper{} // NewRoundTripper creates a new SpdyRoundTripper that will use the specified // tlsConfig. -func NewRoundTripper(tlsConfig *tls.Config) *SpdyRoundTripper { +func NewRoundTripper(tlsConfig *tls.Config) (*SpdyRoundTripper, error) { return NewRoundTripperWithConfig(RoundTripperConfig{ - TLS: tlsConfig, + TLS: tlsConfig, + UpgradeTransport: nil, }) } // NewRoundTripperWithProxy creates a new SpdyRoundTripper that will use the // specified tlsConfig and proxy func. -func NewRoundTripperWithProxy(tlsConfig *tls.Config, proxier func(*http.Request) (*url.URL, error)) *SpdyRoundTripper { +func NewRoundTripperWithProxy(tlsConfig *tls.Config, proxier func(*http.Request) (*url.URL, error)) (*SpdyRoundTripper, error) { return NewRoundTripperWithConfig(RoundTripperConfig{ - TLS: tlsConfig, - Proxier: proxier, + TLS: tlsConfig, + Proxier: proxier, + UpgradeTransport: nil, }) } // NewRoundTripperWithConfig creates a new SpdyRoundTripper with the specified -// configuration. -func NewRoundTripperWithConfig(cfg RoundTripperConfig) *SpdyRoundTripper { +// configuration. Returns an error if the SpdyRoundTripper is misconfigured. +func NewRoundTripperWithConfig(cfg RoundTripperConfig) (*SpdyRoundTripper, error) { + // Process UpgradeTransport, which is mutually exclusive to TLSConfig and Proxier. + if cfg.UpgradeTransport != nil { + if cfg.TLS != nil || cfg.Proxier != nil { + return nil, fmt.Errorf("SpdyRoundTripper: UpgradeTransport is mutually exclusive to TLSConfig or Proxier") + } + tlsConfig, err := utilnet.TLSClientConfig(cfg.UpgradeTransport) + if err != nil { + return nil, fmt.Errorf("SpdyRoundTripper: Unable to retrieve TLSConfig from UpgradeTransport: %v", err) + } + cfg.TLS = tlsConfig + } if cfg.Proxier == nil { cfg.Proxier = utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment) } return &SpdyRoundTripper{ - tlsConfig: cfg.TLS, - proxier: cfg.Proxier, - pingPeriod: cfg.PingPeriod, - } + tlsConfig: cfg.TLS, + proxier: cfg.Proxier, + pingPeriod: cfg.PingPeriod, + upgradeTransport: cfg.UpgradeTransport, + }, nil } // RoundTripperConfig is a set of options for an SpdyRoundTripper. type RoundTripperConfig struct { - // TLS configuration used by the round tripper. + // TLS configuration used by the round tripper if UpgradeTransport not present. TLS *tls.Config // Proxier is a proxy function invoked on each request. Optional. Proxier func(*http.Request) (*url.URL, error) // PingPeriod is a period for sending SPDY Pings on the connection. // Optional. PingPeriod time.Duration + // UpgradeTransport is a subtitute transport used for dialing. If set, + // this field will be used instead of "TLS" and "Proxier" for connection creation. + // Optional. + UpgradeTransport http.RoundTripper } // TLSClientConfig implements pkg/util/net.TLSClientConfigHolder for proper TLS checking during @@ -123,7 +146,13 @@ func (s *SpdyRoundTripper) TLSClientConfig() *tls.Config { // Dial implements k8s.io/apimachinery/pkg/util/net.Dialer. func (s *SpdyRoundTripper) Dial(req *http.Request) (net.Conn, error) { - conn, err := s.dial(req) + var conn net.Conn + var err error + if s.upgradeTransport != nil { + conn, err = apiproxy.DialURL(req.Context(), req.URL, s.upgradeTransport) + } else { + conn, err = s.dial(req) + } if err != nil { return nil, err } diff --git a/pkg/util/httpstream/spdy/roundtripper_test.go b/pkg/util/httpstream/spdy/roundtripper_test.go index b2c2b8851..de88f4e60 100644 --- a/pkg/util/httpstream/spdy/roundtripper_test.go +++ b/pkg/util/httpstream/spdy/roundtripper_test.go @@ -25,7 +25,9 @@ import ( "net/http" "net/http/httptest" "net/url" + "reflect" "strconv" + "strings" "testing" "github.com/armon/go-socks5" @@ -324,7 +326,10 @@ func TestRoundTripAndNewConnection(t *testing.T) { t.Fatalf("error creating request: %s", err) } - spdyTransport := NewRoundTripper(testCase.clientTLS) + spdyTransport, err := NewRoundTripper(testCase.clientTLS) + if err != nil { + t.Fatalf("error creating SpdyRoundTripper: %v", err) + } var proxierCalled bool var proxyCalledWithHost string @@ -428,6 +433,74 @@ func TestRoundTripAndNewConnection(t *testing.T) { } } +// Tests SpdyRoundTripper constructors +func TestRoundTripConstuctor(t *testing.T) { + testCases := map[string]struct { + tlsConfig *tls.Config + proxier func(req *http.Request) (*url.URL, error) + upgradeTransport http.RoundTripper + expectedTLSConfig *tls.Config + errMsg string + }{ + "Basic TLSConfig; no error": { + tlsConfig: &tls.Config{InsecureSkipVerify: true}, + expectedTLSConfig: &tls.Config{InsecureSkipVerify: true}, + upgradeTransport: nil, + }, + "Basic TLSConfig and Proxier: no error": { + tlsConfig: &tls.Config{InsecureSkipVerify: true}, + proxier: func(req *http.Request) (*url.URL, error) { return nil, nil }, + expectedTLSConfig: &tls.Config{InsecureSkipVerify: true}, + upgradeTransport: nil, + }, + "TLSConfig with UpgradeTransport: error": { + tlsConfig: &tls.Config{InsecureSkipVerify: true}, + upgradeTransport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, + expectedTLSConfig: &tls.Config{InsecureSkipVerify: true}, + errMsg: "SpdyRoundTripper: UpgradeTransport is mutually exclusive to TLSConfig or Proxier", + }, + "Proxier with UpgradeTransport: error": { + proxier: func(req *http.Request) (*url.URL, error) { return nil, nil }, + upgradeTransport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, + expectedTLSConfig: &tls.Config{InsecureSkipVerify: true}, + errMsg: "SpdyRoundTripper: UpgradeTransport is mutually exclusive to TLSConfig or Proxier", + }, + "Only UpgradeTransport: no error": { + upgradeTransport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, + expectedTLSConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + spdyRoundTripper, err := NewRoundTripperWithConfig( + RoundTripperConfig{ + TLS: testCase.tlsConfig, + Proxier: testCase.proxier, + UpgradeTransport: testCase.upgradeTransport, + }, + ) + if testCase.errMsg != "" { + if err == nil { + t.Fatalf("expected error but received none") + } + if !strings.Contains(err.Error(), testCase.errMsg) { + t.Fatalf("expected error message (%s), got (%s)", err.Error(), testCase.errMsg) + } + } + if testCase.errMsg == "" { + if err != nil { + t.Fatalf("unexpected error received: %v", err) + } + actualTLSConfig := spdyRoundTripper.TLSClientConfig() + if !reflect.DeepEqual(testCase.expectedTLSConfig, actualTLSConfig) { + t.Errorf("expected TLSConfig (%v), got (%v)", + testCase.expectedTLSConfig, actualTLSConfig) + } + } + }) + } +} + type Interceptor struct { Authorization socks5.AuthContext proxyCalledWithHost *string @@ -544,7 +617,10 @@ func TestRoundTripSocks5AndNewConnection(t *testing.T) { t.Fatalf("error creating request: %s", err) } - spdyTransport := NewRoundTripper(testCase.clientTLS) + spdyTransport, err := NewRoundTripper(testCase.clientTLS) + if err != nil { + t.Fatalf("error creating SpdyRoundTripper: %v", err) + } var proxierCalled bool var proxyCalledWithHost string @@ -704,7 +780,10 @@ func TestRoundTripPassesContextToDialer(t *testing.T) { cancel() req, err := http.NewRequestWithContext(ctx, "GET", u, nil) require.NoError(t, err) - spdyTransport := NewRoundTripper(&tls.Config{}) + spdyTransport, err := NewRoundTripper(&tls.Config{}) + if err != nil { + t.Fatalf("error creating SpdyRoundTripper: %v", err) + } _, err = spdyTransport.Dial(req) assert.EqualError(t, err, "dial tcp 127.0.0.1:1233: operation was canceled") }) diff --git a/pkg/util/httpstream/wsstream/conn.go b/pkg/util/httpstream/wsstream/conn.go index d153070ce..7cfdd0632 100644 --- a/pkg/util/httpstream/wsstream/conn.go +++ b/pkg/util/httpstream/wsstream/conn.go @@ -32,6 +32,8 @@ import ( "k8s.io/klog/v2" ) +const WebSocketProtocolHeader = "Sec-Websocket-Protocol" + // The Websocket subprotocol "channel.k8s.io" prepends each binary message with a byte indicating // the channel number (zero indexed) the message was sent on. Messages in both directions should // prefix their messages with this channel byte. When used for remote execution, the channel numbers @@ -87,6 +89,23 @@ func IsWebSocketRequest(req *http.Request) bool { return httpstream.IsUpgradeRequest(req) } +// IsWebSocketRequestWithStreamCloseProtocol returns true if the request contains headers +// identifying that it is requesting a websocket upgrade with a remotecommand protocol +// version that supports the "CLOSE" signal; false otherwise. +func IsWebSocketRequestWithStreamCloseProtocol(req *http.Request) bool { + if !IsWebSocketRequest(req) { + return false + } + requestedProtocols := strings.TrimSpace(req.Header.Get(WebSocketProtocolHeader)) + for _, requestedProtocol := range strings.Split(requestedProtocols, ",") { + if protocolSupportsStreamClose(strings.TrimSpace(requestedProtocol)) { + return true + } + } + + return false +} + // IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the // read and write deadlines are pushed every time a new message is received. func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) { @@ -168,15 +187,46 @@ func (conn *Conn) SetIdleTimeout(duration time.Duration) { conn.timeout = duration } +// SetWriteDeadline sets a timeout on writing to the websocket connection. The +// passed "duration" identifies how far into the future the write must complete +// by before the timeout fires. +func (conn *Conn) SetWriteDeadline(duration time.Duration) { + conn.ws.SetWriteDeadline(time.Now().Add(duration)) //nolint:errcheck +} + // Open the connection and create channels for reading and writing. It returns // the selected subprotocol, a slice of channels and an error. func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) { + // serveHTTPComplete is channel that is closed/selected when "websocket#ServeHTTP" finishes. + serveHTTPComplete := make(chan struct{}) + // Ensure panic in spawned goroutine is propagated into the parent goroutine. + panicChan := make(chan any, 1) go func() { - defer runtime.HandleCrash() - defer conn.Close() + // If websocket server returns, propagate panic if necessary. Otherwise, + // signal HTTPServe finished by closing "serveHTTPComplete". + defer func() { + if p := recover(); p != nil { + panicChan <- p + } else { + close(serveHTTPComplete) + } + }() websocket.Server{Handshake: conn.handshake, Handler: conn.handle}.ServeHTTP(w, req) }() - <-conn.ready + + // In normal circumstances, "websocket.Server#ServeHTTP" calls "initialize" which closes + // "conn.ready" and then blocks until serving is complete. + select { + case <-conn.ready: + klog.V(8).Infof("websocket server initialized--serving") + case <-serveHTTPComplete: + // websocket server returned before completing initialization; cleanup and return error. + conn.closeNonThreadSafe() //nolint:errcheck + return "", nil, fmt.Errorf("websocket server finished before becoming ready") + case p := <-panicChan: + panic(p) + } + rwc := make([]io.ReadWriteCloser, len(conn.channels)) for i := range conn.channels { rwc[i] = conn.channels[i] @@ -225,14 +275,23 @@ func (conn *Conn) resetTimeout() { } } -// Close is only valid after Open has been called -func (conn *Conn) Close() error { - <-conn.ready +// closeNonThreadSafe cleans up by closing streams and the websocket +// connection *without* waiting for the "ready" channel. +func (conn *Conn) closeNonThreadSafe() error { for _, s := range conn.channels { s.Close() } - conn.ws.Close() - return nil + var err error + if conn.ws != nil { + err = conn.ws.Close() + } + return err +} + +// Close is only valid after Open has been called +func (conn *Conn) Close() error { + <-conn.ready + return conn.closeNonThreadSafe() } // protocolSupportsStreamClose returns true if the passed protocol @@ -244,8 +303,8 @@ func protocolSupportsStreamClose(protocol string) bool { // handle implements a websocket handler. func (conn *Conn) handle(ws *websocket.Conn) { - defer conn.Close() conn.initialize(ws) + defer conn.Close() supportsStreamClose := protocolSupportsStreamClose(conn.selectedProtocol) for { diff --git a/pkg/util/httpstream/wsstream/conn_test.go b/pkg/util/httpstream/wsstream/conn_test.go index 8d9f5d5d4..e4a88a1a8 100644 --- a/pkg/util/httpstream/wsstream/conn_test.go +++ b/pkg/util/httpstream/wsstream/conn_test.go @@ -25,6 +25,8 @@ import ( "sync" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/net/websocket" ) @@ -271,3 +273,146 @@ func TestVersionedConn(t *testing.T) { }() } } + +func TestIsWebSocketRequestWithStreamCloseProtocol(t *testing.T) { + tests := map[string]struct { + headers map[string]string + expected bool + }{ + "No headers returns false": { + headers: map[string]string{}, + expected: false, + }, + "Only connection upgrade header is false": { + headers: map[string]string{ + "Connection": "upgrade", + }, + expected: false, + }, + "Only websocket upgrade header is false": { + headers: map[string]string{ + "Upgrade": "websocket", + }, + expected: false, + }, + "Only websocket and connection upgrade headers is false": { + headers: map[string]string{ + "Connection": "upgrade", + "Upgrade": "websocket", + }, + expected: false, + }, + "Missing connection/upgrade header is false": { + headers: map[string]string{ + "Upgrade": "websocket", + WebSocketProtocolHeader: "v5.channel.k8s.io", + }, + expected: false, + }, + "Websocket connection upgrade headers with v5 protocol is true": { + headers: map[string]string{ + "Connection": "upgrade", + "Upgrade": "websocket", + WebSocketProtocolHeader: "v5.channel.k8s.io", + }, + expected: true, + }, + "Websocket connection upgrade headers with wrong case v5 protocol is false": { + headers: map[string]string{ + "Connection": "upgrade", + "Upgrade": "websocket", + WebSocketProtocolHeader: "v5.CHANNEL.k8s.io", // header value is case-sensitive + }, + expected: false, + }, + "Websocket connection upgrade headers with v4 protocol is false": { + headers: map[string]string{ + "Connection": "upgrade", + "Upgrade": "websocket", + WebSocketProtocolHeader: "v4.channel.k8s.io", + }, + expected: false, + }, + "Websocket connection upgrade headers with multiple protocols but missing v5 is false": { + headers: map[string]string{ + "Connection": "upgrade", + "Upgrade": "websocket", + WebSocketProtocolHeader: "v4.channel.k8s.io,v3.channel.k8s.io,v2.channel.k8s.io", + }, + expected: false, + }, + "Websocket connection upgrade headers with multiple protocols including v5 and spaces is true": { + headers: map[string]string{ + "Connection": "upgrade", + "Upgrade": "websocket", + WebSocketProtocolHeader: "v5.channel.k8s.io, v4.channel.k8s.io", + }, + expected: true, + }, + "Websocket connection upgrade headers with multiple protocols out of order including v5 and spaces is true": { + headers: map[string]string{ + "Connection": "upgrade", + "Upgrade": "websocket", + WebSocketProtocolHeader: "v4.channel.k8s.io, v5.channel.k8s.io, v3.channel.k8s.io", + }, + expected: true, + }, + + "Websocket connection upgrade headers key is case-insensitive": { + headers: map[string]string{ + "Connection": "upgrade", + "Upgrade": "websocket", + "sec-websocket-protocol": "v4.channel.k8s.io, v5.channel.k8s.io, v3.channel.k8s.io", + }, + expected: true, + }, + } + + for name, test := range tests { + req, err := http.NewRequest("GET", "http://www.example.com/", nil) + require.NoError(t, err) + for key, value := range test.headers { + req.Header.Add(key, value) + } + actual := IsWebSocketRequestWithStreamCloseProtocol(req) + assert.Equal(t, test.expected, actual, "%s: expected (%t), got (%t)", name, test.expected, actual) + } +} + +func TestProtocolSupportsStreamClose(t *testing.T) { + tests := map[string]struct { + protocol string + expected bool + }{ + "empty protocol returns false": { + protocol: "", + expected: false, + }, + "not binary protocol returns false": { + protocol: "base64.channel.k8s.io", + expected: false, + }, + "V1 protocol returns false": { + protocol: "channel.k8s.io", + expected: false, + }, + "V4 protocol returns false": { + protocol: "v4.channel.k8s.io", + expected: false, + }, + "V5 protocol returns true": { + protocol: "v5.channel.k8s.io", + expected: true, + }, + "V5 protocol wrong case returns false": { + protocol: "V5.channel.K8S.io", + expected: false, + }, + } + + for name, test := range tests { + actual := protocolSupportsStreamClose(test.protocol) + assert.Equal(t, test.expected, actual, + "%s: expected (%t), got (%t)", name, test.expected, actual) + } +} diff --git a/pkg/util/proxy/dial.go b/pkg/util/proxy/dial.go index 4ceb2e06e..e5196d1ee 100644 --- a/pkg/util/proxy/dial.go +++ b/pkg/util/proxy/dial.go @@ -29,12 +29,12 @@ import ( "k8s.io/klog/v2" ) -// dialURL will dial the specified URL using the underlying dialer held by the passed +// DialURL will dial the specified URL using the underlying dialer held by the passed // RoundTripper. The primary use of this method is to support proxying upgradable connections. // For this reason this method will prefer to negotiate http/1.1 if the URL scheme is https. // If you wish to ensure ALPN negotiates http2 then set NextProto=[]string{"http2"} in the // TLSConfig of the http.Transport -func dialURL(ctx context.Context, url *url.URL, transport http.RoundTripper) (net.Conn, error) { +func DialURL(ctx context.Context, url *url.URL, transport http.RoundTripper) (net.Conn, error) { dialAddr := netutil.CanonicalAddr(url) dialer, err := utilnet.DialerFor(transport) diff --git a/pkg/util/proxy/dial_test.go b/pkg/util/proxy/dial_test.go index 32e951e61..488e878b7 100644 --- a/pkg/util/proxy/dial_test.go +++ b/pkg/util/proxy/dial_test.go @@ -143,7 +143,7 @@ func TestDialURL(t *testing.T) { u, _ := url.Parse(ts.URL) _, p, _ := net.SplitHostPort(u.Host) u.Host = net.JoinHostPort("127.0.0.1", p) - conn, err := dialURL(context.Background(), u, transport) + conn, err := DialURL(context.Background(), u, transport) // Make sure dialing doesn't mutate the transport's TLSConfig if !reflect.DeepEqual(tc.TLSConfig, tlsConfigCopy) { diff --git a/pkg/util/proxy/upgradeaware.go b/pkg/util/proxy/upgradeaware.go index ac2ada547..76acdfb4a 100644 --- a/pkg/util/proxy/upgradeaware.go +++ b/pkg/util/proxy/upgradeaware.go @@ -492,7 +492,7 @@ func getResponse(r io.Reader) (*http.Response, []byte, error) { // dial dials the backend at req.URL and writes req to it. func dial(req *http.Request, transport http.RoundTripper) (net.Conn, error) { - conn, err := dialURL(req.Context(), req.URL, transport) + conn, err := DialURL(req.Context(), req.URL, transport) if err != nil { return nil, fmt.Errorf("error dialing backend: %v", err) }