From 9b80f4d5d94d3ae509e8f1b72feace56de36115c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Sat, 9 Sep 2023 10:59:00 +0200 Subject: [PATCH 01/16] Add proxyCommand to client config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/config.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clients/go/admin/config.go b/clients/go/admin/config.go index e6c5ad06d..3ce329e4a 100644 --- a/clients/go/admin/config.go +++ b/clients/go/admin/config.go @@ -74,6 +74,8 @@ type Config struct { Command []string `json:"command" pflag:",Command for external authentication token generation"` + ProxyCommand []string `json:"proxyCommand" pflag:",Command for external proxy-authorization token generation"` + // Set the gRPC service config formatted as a json string https://github.com/grpc/grpc/blob/master/doc/service_config.md // eg. {"loadBalancingConfig": [{"round_robin":{}}], "methodConfig": [{"name":[{"service": "foo", "method": "bar"}, {"service": "baz"}], "timeout": "1.000000001s"}]} // find the full schema here https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto#L625 From d6a2ac84fb49ed8fdcb4be42b83624448484cd96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Sun, 10 Sep 2023 12:59:05 +0200 Subject: [PATCH 02/16] Add proxy auth unary interceptor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor.go | 45 +++++++++++++++++++++++ clients/go/admin/client.go | 2 + clients/go/admin/token_source_provider.go | 8 ++++ 3 files changed, 55 insertions(+) diff --git a/clients/go/admin/auth_interceptor.go b/clients/go/admin/auth_interceptor.go index 340326784..a36ff49f3 100644 --- a/clients/go/admin/auth_interceptor.go +++ b/clients/go/admin/auth_interceptor.go @@ -48,6 +48,26 @@ func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.T return nil } +// MaterializeCredentials will attempt to build a TokenSource given the anonymously available information exposed by the server. +// Once established, it'll invoke PerRPCCredentialsFuture.Store() on perRPCCredentials to populate it with the appropriate values. +func MaterializeProxyAuthCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture) error { + tokenSourceProvider, err := NewProxyTokenSourceProvider(ctx, cfg, tokenCache) + if err != nil { + return fmt.Errorf("failed to initialized proxy authorization token source provider. Err: %w", err) + } + + authorizationMetadataKey := "proxy-authorization" + + proxyTokenSource, err := tokenSourceProvider.GetTokenSource(ctx) + if err != nil { + return err + } + + wrappedTokenSource := NewCustomHeaderTokenSource(proxyTokenSource, cfg.UseInsecureConnection, authorizationMetadataKey) + perRPCCredentials.Store(wrappedTokenSource) + return nil +} + func shouldAttemptToAuthenticate(errorCode codes.Code) bool { return errorCode == codes.Unauthenticated } @@ -102,3 +122,28 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFut return err } } + +func NewProxyAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFuture *PerRPCCredentialsFuture) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + ctx = setHTTPClientContext(ctx, cfg) + + err := invoker(ctx, method, req, reply, cc, opts...) + if err != nil { + logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err) + + // st, ok := status.FromError(err) + //if ok { + // If the error we receive from executing the request expects + //if shouldAttemptToAuthenticate(st.Code()) { + fmt.Println("Would attempt to attach proxy-auth header here") + newErr := MaterializeProxyAuthCredentials(ctx, cfg, tokenCache, credentialsFuture) + if newErr != nil { + return fmt.Errorf("proxy-authorization error! Original Error: %v, Auth Error: %w", err, newErr) + } + return invoker(ctx, method, req, reply, cc, opts...) + // } + // } + } + return err + } +} diff --git a/clients/go/admin/client.go b/clients/go/admin/client.go index 830c86fe8..7f2699d1a 100644 --- a/clients/go/admin/client.go +++ b/clients/go/admin/client.go @@ -172,7 +172,9 @@ func InitializeAdminClient(ctx context.Context, cfg *Config, opts ...grpc.DialOp // for the process. Note that if called with different cfg/dialoptions, it will not refresh the connection. func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, opts ...grpc.DialOption) (*Clientset, error) { credentialsFuture := NewPerRPCCredentialsFuture() + // TODO, only optionally add the proxy authenticator opts = append(opts, + grpc.WithChainUnaryInterceptor(NewProxyAuthInterceptor(cfg, tokenCache, credentialsFuture)), grpc.WithChainUnaryInterceptor(NewAuthInterceptor(cfg, tokenCache, credentialsFuture)), grpc.WithPerRPCCredentials(credentialsFuture)) diff --git a/clients/go/admin/token_source_provider.go b/clients/go/admin/token_source_provider.go index e836bf81a..a2c9d956e 100644 --- a/clients/go/admin/token_source_provider.go +++ b/clients/go/admin/token_source_provider.go @@ -107,6 +107,14 @@ func NewTokenSourceProvider(ctx context.Context, cfg *Config, tokenCache cache.T return tokenProvider, nil } +func NewProxyTokenSourceProvider(ctx context.Context, cfg *Config, tokenCache cache.TokenCache) (TokenSourceProvider, error) { + proxyTokenSourceProvider, err := NewExternalTokenSourceProvider(cfg.ProxyCommand) + if err != nil { + return nil, err + } + return proxyTokenSourceProvider, nil +} + type ExternalTokenSourceProvider struct { command []string } From 410e3bda2e55c06182ba4173d32b7e14c0f8417e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Thu, 14 Sep 2023 20:36:49 +0200 Subject: [PATCH 03/16] Use proxy auth in http client for oauth MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor.go | 83 ++++++++++++------- clients/go/admin/client.go | 10 ++- .../go/admin/pkce/auth_flow_orchestrator.go | 10 ++- clients/go/admin/pkce/handle_app_call_back.go | 5 +- clients/go/admin/token_source_provider.go | 10 +-- 5 files changed, 72 insertions(+), 46 deletions(-) diff --git a/clients/go/admin/auth_interceptor.go b/clients/go/admin/auth_interceptor.go index a36ff49f3..be831aac1 100644 --- a/clients/go/admin/auth_interceptor.go +++ b/clients/go/admin/auth_interceptor.go @@ -11,11 +11,14 @@ import ( "golang.org/x/oauth2" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/grpc" ) +const ProxyAuthorizationHeader = "proxy-authorization" + // MaterializeCredentials will attempt to build a TokenSource given the anonymously available information exposed by the server. // Once established, it'll invoke PerRPCCredentialsFuture.Store() on perRPCCredentials to populate it with the appropriate values. func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture) error { @@ -48,39 +51,70 @@ func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.T return nil } -// MaterializeCredentials will attempt to build a TokenSource given the anonymously available information exposed by the server. -// Once established, it'll invoke PerRPCCredentialsFuture.Store() on perRPCCredentials to populate it with the appropriate values. -func MaterializeProxyAuthCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture) error { - tokenSourceProvider, err := NewProxyTokenSourceProvider(ctx, cfg, tokenCache) +func GetProxyTokenSource(ctx context.Context, cfg *Config) (oauth2.TokenSource, error) { + tokenSourceProvider, err := NewExternalTokenSourceProvider(cfg.ProxyCommand) + if err != nil { + return nil, fmt.Errorf("failed to initialized proxy authorization token source provider. Err: %w", err) + } + proxyTokenSource, err := tokenSourceProvider.GetTokenSource(ctx) if err != nil { - return fmt.Errorf("failed to initialized proxy authorization token source provider. Err: %w", err) + return nil, err } + return proxyTokenSource, nil +} - authorizationMetadataKey := "proxy-authorization" +func MaterializeProxyAuthCredentials(ctx context.Context, cfg *Config) (context.Context, error) { + proxyTokenSource, err := GetProxyTokenSource(ctx, cfg) + if err != nil { + return nil, err + } - proxyTokenSource, err := tokenSourceProvider.GetTokenSource(ctx) + token, err := proxyTokenSource.Token() if err != nil { - return err + return nil, err } + md := metadata.Pairs(ProxyAuthorizationHeader, "Bearer "+token.AccessToken) + ctx = metadata.NewOutgoingContext(ctx, md) - wrappedTokenSource := NewCustomHeaderTokenSource(proxyTokenSource, cfg.UseInsecureConnection, authorizationMetadataKey) - perRPCCredentials.Store(wrappedTokenSource) - return nil + return ctx, nil } func shouldAttemptToAuthenticate(errorCode codes.Code) bool { return errorCode == codes.Unauthenticated } +type proxyAuthTransport struct { + transport http.RoundTripper + tokenSource oauth2.TokenSource +} + +func (c *proxyAuthTransport) RoundTrip(req *http.Request) (*http.Response, error) { + token, err := c.tokenSource.Token() + if err != nil { + return nil, err + } + req.Header.Add(ProxyAuthorizationHeader, "Bearer "+token.AccessToken) + return c.transport.RoundTrip(req) +} + // Set up http client used in oauth2 func setHTTPClientContext(ctx context.Context, cfg *Config) context.Context { httpClient := &http.Client{} + transport := &http.Transport{} if len(cfg.HTTPProxyURL.String()) > 0 { // create a transport that uses the proxy - transport := &http.Transport{ - Proxy: http.ProxyURL(&cfg.HTTPProxyURL.URL), + transport.Proxy = http.ProxyURL(&cfg.HTTPProxyURL.URL) + } + + if cfg.ProxyCommand != nil { + proxyTokenSource, _ := GetProxyTokenSource(ctx, cfg) + + httpClient.Transport = &proxyAuthTransport{ + transport: transport, + tokenSource: proxyTokenSource, } + } else { httpClient.Transport = transport } @@ -123,27 +157,12 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFut } } -func NewProxyAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFuture *PerRPCCredentialsFuture) grpc.UnaryClientInterceptor { +func NewProxyAuthInterceptor(cfg *Config) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - ctx = setHTTPClientContext(ctx, cfg) - - err := invoker(ctx, method, req, reply, cc, opts...) + ctx, err := MaterializeProxyAuthCredentials(ctx, cfg) if err != nil { - logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err) - - // st, ok := status.FromError(err) - //if ok { - // If the error we receive from executing the request expects - //if shouldAttemptToAuthenticate(st.Code()) { - fmt.Println("Would attempt to attach proxy-auth header here") - newErr := MaterializeProxyAuthCredentials(ctx, cfg, tokenCache, credentialsFuture) - if newErr != nil { - return fmt.Errorf("proxy-authorization error! Original Error: %v, Auth Error: %w", err, newErr) - } - return invoker(ctx, method, req, reply, cc, opts...) - // } - // } + return fmt.Errorf("proxy authorization error! Original Error: %v", err) } - return err + return invoker(ctx, method, req, reply, cc, opts...) } } diff --git a/clients/go/admin/client.go b/clients/go/admin/client.go index 7f2699d1a..b7144631f 100644 --- a/clients/go/admin/client.go +++ b/clients/go/admin/client.go @@ -124,7 +124,7 @@ func NewAdminConnection(ctx context.Context, cfg *Config, opts ...grpc.DialOptio if opts == nil { // Initialize opts list to the potential number of options we will add. Initialization optimizes memory // allocation. - opts = make([]grpc.DialOption, 0, 5) + opts = make([]grpc.DialOption, 0, 6) } if cfg.UseInsecureConnection { @@ -153,6 +153,11 @@ func NewAdminConnection(ctx context.Context, cfg *Config, opts ...grpc.DialOptio opts = append(opts, GetAdditionalAdminClientConfigOptions(cfg)...) + // Ensure proxy auth interceptor is invoked prior to auth interceptor + if cfg.ProxyCommand != nil { + opts = append([]grpc.DialOption{grpc.WithChainUnaryInterceptor(NewProxyAuthInterceptor(cfg))}, opts...) + } + return grpc.Dial(cfg.Endpoint.String(), opts...) } @@ -172,9 +177,8 @@ func InitializeAdminClient(ctx context.Context, cfg *Config, opts ...grpc.DialOp // for the process. Note that if called with different cfg/dialoptions, it will not refresh the connection. func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, opts ...grpc.DialOption) (*Clientset, error) { credentialsFuture := NewPerRPCCredentialsFuture() - // TODO, only optionally add the proxy authenticator + opts = append(opts, - grpc.WithChainUnaryInterceptor(NewProxyAuthInterceptor(cfg, tokenCache, credentialsFuture)), grpc.WithChainUnaryInterceptor(NewAuthInterceptor(cfg, tokenCache, credentialsFuture)), grpc.WithPerRPCCredentials(credentialsFuture)) diff --git a/clients/go/admin/pkce/auth_flow_orchestrator.go b/clients/go/admin/pkce/auth_flow_orchestrator.go index e2eee411d..a71841d6c 100644 --- a/clients/go/admin/pkce/auth_flow_orchestrator.go +++ b/clients/go/admin/pkce/auth_flow_orchestrator.go @@ -2,6 +2,7 @@ package pkce import ( "context" + "errors" "fmt" "net/http" "net/url" @@ -63,8 +64,15 @@ func (f TokenOrchestrator) FetchTokenFromAuthFlow(ctx context.Context) (*oauth2. serveMux := http.NewServeMux() server := &http.Server{Addr: redirectURL.Host, Handler: serveMux, ReadHeaderTimeout: 0} // Register the call back handler + + // Pass along http client used in oauth2 + httpClient, ok := ctx.Value(oauth2.HTTPClient).(*http.Client) + if !ok { + return nil, errors.New("Unable to retrieve httpClient used in oauth2 from context") + } + serveMux.HandleFunc(redirectURL.Path, getAuthServerCallbackHandler(f.ClientConfig, pkceCodeVerifier, - tokenChannel, errorChannel, stateString)) // the oauth2 callback endpoint + tokenChannel, errorChannel, stateString, httpClient)) // the oauth2 callback endpoint defer server.Close() go func() { diff --git a/clients/go/admin/pkce/handle_app_call_back.go b/clients/go/admin/pkce/handle_app_call_back.go index 0874a5a4c..470f639fa 100644 --- a/clients/go/admin/pkce/handle_app_call_back.go +++ b/clients/go/admin/pkce/handle_app_call_back.go @@ -11,7 +11,7 @@ import ( ) func getAuthServerCallbackHandler(c *oauth.Config, codeVerifier string, tokenChannel chan *oauth2.Token, - errorChannel chan error, stateString string) func(rw http.ResponseWriter, req *http.Request) { + errorChannel chan error, stateString string, client *http.Client) func(rw http.ResponseWriter, req *http.Request) { return func(rw http.ResponseWriter, req *http.Request) { _, _ = rw.Write([]byte(`

Flyte Authentication

`)) @@ -43,7 +43,8 @@ func getAuthServerCallbackHandler(c *oauth.Config, codeVerifier string, tokenCha var opts []oauth2.AuthCodeOption opts = append(opts, oauth2.SetAuthURLParam("code_verifier", codeVerifier)) - token, err := c.Exchange(context.Background(), req.URL.Query().Get("code"), opts...) + ctx := context.WithValue(context.Background(), oauth2.HTTPClient, client) + token, err := c.Exchange(ctx, req.URL.Query().Get("code"), opts...) if err != nil { errorChannel <- fmt.Errorf("error while exchanging auth code due to %v", err) _, _ = rw.Write([]byte(fmt.Sprintf(`

Couldn't get access token due to error: %s

`, err.Error()))) diff --git a/clients/go/admin/token_source_provider.go b/clients/go/admin/token_source_provider.go index a2c9d956e..541f8feb2 100644 --- a/clients/go/admin/token_source_provider.go +++ b/clients/go/admin/token_source_provider.go @@ -107,14 +107,6 @@ func NewTokenSourceProvider(ctx context.Context, cfg *Config, tokenCache cache.T return tokenProvider, nil } -func NewProxyTokenSourceProvider(ctx context.Context, cfg *Config, tokenCache cache.TokenCache) (TokenSourceProvider, error) { - proxyTokenSourceProvider, err := NewExternalTokenSourceProvider(cfg.ProxyCommand) - if err != nil { - return nil, err - } - return proxyTokenSourceProvider, nil -} - type ExternalTokenSourceProvider struct { command []string } @@ -159,6 +151,8 @@ func GetPKCEAuthTokenSource(ctx context.Context, pkceTokenOrchestrator pkce.Toke logger.Warnf(ctx, "Failed fetching from cache. Will restart the flow. Error: %v", err) } + authToken = nil + if authToken == nil { // Fetch using auth flow if authToken, err = pkceTokenOrchestrator.FetchTokenFromAuthFlow(ctx); err != nil { From e435a3aa9f4e90c8425c7ce2a0fa39e3af5bc267 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Sat, 16 Sep 2023 09:56:59 +0200 Subject: [PATCH 04/16] Cache tokens obtained from external commands MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor.go | 36 ++++++++------ clients/go/admin/client.go | 18 +++---- clients/go/admin/client_builder.go | 19 ++++++-- clients/go/admin/externalprocess/token.go | 37 ++++++++++++++ .../go/admin/externalprocess/token_test.go | 24 ++++++++++ clients/go/admin/token_source_provider.go | 48 ++++++++++++++----- go.mod | 1 + go.sum | 2 + 8 files changed, 146 insertions(+), 39 deletions(-) create mode 100644 clients/go/admin/externalprocess/token.go create mode 100644 clients/go/admin/externalprocess/token_test.go diff --git a/clients/go/admin/auth_interceptor.go b/clients/go/admin/auth_interceptor.go index be831aac1..949468fbc 100644 --- a/clients/go/admin/auth_interceptor.go +++ b/clients/go/admin/auth_interceptor.go @@ -21,8 +21,8 @@ const ProxyAuthorizationHeader = "proxy-authorization" // MaterializeCredentials will attempt to build a TokenSource given the anonymously available information exposed by the server. // Once established, it'll invoke PerRPCCredentialsFuture.Store() on perRPCCredentials to populate it with the appropriate values. -func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture) error { - authMetadataClient, err := InitializeAuthMetadataClient(ctx, cfg) +func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, proxyTokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture) error { + authMetadataClient, err := InitializeAuthMetadataClient(ctx, cfg, proxyTokenCache) if err != nil { return fmt.Errorf("failed to initialized Auth Metadata Client. Error: %w", err) } @@ -51,8 +51,8 @@ func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.T return nil } -func GetProxyTokenSource(ctx context.Context, cfg *Config) (oauth2.TokenSource, error) { - tokenSourceProvider, err := NewExternalTokenSourceProvider(cfg.ProxyCommand) +func GetProxyTokenSource(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache) (oauth2.TokenSource, error) { + tokenSourceProvider, err := NewExternalTokenSourceProvider(cfg.ProxyCommand, proxyTokenCache) if err != nil { return nil, fmt.Errorf("failed to initialized proxy authorization token source provider. Err: %w", err) } @@ -63,8 +63,8 @@ func GetProxyTokenSource(ctx context.Context, cfg *Config) (oauth2.TokenSource, return proxyTokenSource, nil } -func MaterializeProxyAuthCredentials(ctx context.Context, cfg *Config) (context.Context, error) { - proxyTokenSource, err := GetProxyTokenSource(ctx, cfg) +func MaterializeProxyAuthCredentials(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache) (context.Context, error) { + proxyTokenSource, err := GetProxyTokenSource(ctx, cfg, proxyTokenCache) if err != nil { return nil, err } @@ -98,7 +98,7 @@ func (c *proxyAuthTransport) RoundTrip(req *http.Request) (*http.Response, error } // Set up http client used in oauth2 -func setHTTPClientContext(ctx context.Context, cfg *Config) context.Context { +func setHTTPClientContext(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache) (context.Context, error) { httpClient := &http.Client{} transport := &http.Transport{} @@ -108,7 +108,10 @@ func setHTTPClientContext(ctx context.Context, cfg *Config) context.Context { } if cfg.ProxyCommand != nil { - proxyTokenSource, _ := GetProxyTokenSource(ctx, cfg) + proxyTokenSource, err := GetProxyTokenSource(ctx, cfg, proxyTokenCache) + if err != nil { + return nil, err + } httpClient.Transport = &proxyAuthTransport{ transport: transport, @@ -118,7 +121,7 @@ func setHTTPClientContext(ctx context.Context, cfg *Config) context.Context { httpClient.Transport = transport } - return context.WithValue(ctx, oauth2.HTTPClient, httpClient) + return context.WithValue(ctx, oauth2.HTTPClient, httpClient), nil } // NewAuthInterceptor creates a new grpc.UnaryClientInterceptor that forwards the grpc call and inspects the error. @@ -131,11 +134,14 @@ func setHTTPClientContext(ctx context.Context, cfg *Config) context.Context { // more. It'll fail hard if it couldn't do so (i.e. it will no longer attempt to send an unauthenticated request). Once // a token source has been created, it'll invoke the grpc pipeline again, this time the grpc.PerRPCCredentials should // be able to find and acquire a valid AccessToken to annotate the request with. -func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFuture *PerRPCCredentialsFuture) grpc.UnaryClientInterceptor { +func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, proxyTokenCache cache.TokenCache, credentialsFuture *PerRPCCredentialsFuture) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - ctx = setHTTPClientContext(ctx, cfg) + ctx, err := setHTTPClientContext(ctx, cfg, proxyTokenCache) + if err != nil { + return err + } - err := invoker(ctx, method, req, reply, cc, opts...) + err = invoker(ctx, method, req, reply, cc, opts...) if err != nil { logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err) @@ -143,7 +149,7 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFut // If the error we receive from executing the request expects if shouldAttemptToAuthenticate(st.Code()) { logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code()) - newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture) + newErr := MaterializeCredentials(ctx, cfg, tokenCache, proxyTokenCache, credentialsFuture) if newErr != nil { return fmt.Errorf("authentication error! Original Error: %v, Auth Error: %w", err, newErr) } @@ -157,9 +163,9 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFut } } -func NewProxyAuthInterceptor(cfg *Config) grpc.UnaryClientInterceptor { +func NewProxyAuthInterceptor(cfg *Config, proxyTokenCache cache.TokenCache) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - ctx, err := MaterializeProxyAuthCredentials(ctx, cfg) + ctx, err := MaterializeProxyAuthCredentials(ctx, cfg, proxyTokenCache) if err != nil { return fmt.Errorf("proxy authorization error! Original Error: %v", err) } diff --git a/clients/go/admin/client.go b/clients/go/admin/client.go index b7144631f..2dc94980d 100644 --- a/clients/go/admin/client.go +++ b/clients/go/admin/client.go @@ -110,9 +110,9 @@ func getAuthenticationDialOption(ctx context.Context, cfg *Config, tokenSourcePr } // InitializeAuthMetadataClient creates a new anonymously Auth Metadata Service client. -func InitializeAuthMetadataClient(ctx context.Context, cfg *Config) (client service.AuthMetadataServiceClient, err error) { +func InitializeAuthMetadataClient(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache) (client service.AuthMetadataServiceClient, err error) { // Create an unauthenticated connection to fetch AuthMetadata - authMetadataConnection, err := NewAdminConnection(ctx, cfg) + authMetadataConnection, err := NewAdminConnection(ctx, cfg, proxyTokenCache) if err != nil { return nil, fmt.Errorf("failed to initialized admin connection. Error: %w", err) } @@ -120,7 +120,7 @@ func InitializeAuthMetadataClient(ctx context.Context, cfg *Config) (client serv return service.NewAuthMetadataServiceClient(authMetadataConnection), nil } -func NewAdminConnection(ctx context.Context, cfg *Config, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func NewAdminConnection(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache, opts ...grpc.DialOption) (*grpc.ClientConn, error) { if opts == nil { // Initialize opts list to the potential number of options we will add. Initialization optimizes memory // allocation. @@ -155,7 +155,7 @@ func NewAdminConnection(ctx context.Context, cfg *Config, opts ...grpc.DialOptio // Ensure proxy auth interceptor is invoked prior to auth interceptor if cfg.ProxyCommand != nil { - opts = append([]grpc.DialOption{grpc.WithChainUnaryInterceptor(NewProxyAuthInterceptor(cfg))}, opts...) + opts = append([]grpc.DialOption{grpc.WithChainUnaryInterceptor(NewProxyAuthInterceptor(cfg, proxyTokenCache))}, opts...) } return grpc.Dial(cfg.Endpoint.String(), opts...) @@ -164,7 +164,7 @@ func NewAdminConnection(ctx context.Context, cfg *Config, opts ...grpc.DialOptio // InitializeAdminClient creates an AdminClient with a shared Admin connection for the process // Deprecated: Please use initializeClients instead. func InitializeAdminClient(ctx context.Context, cfg *Config, opts ...grpc.DialOption) service.AdminServiceClient { - set, err := initializeClients(ctx, cfg, nil, opts...) + set, err := initializeClients(ctx, cfg, nil, nil, opts...) if err != nil { logger.Panicf(ctx, "Failed to initialized client. Error: %v", err) return nil @@ -175,18 +175,18 @@ func InitializeAdminClient(ctx context.Context, cfg *Config, opts ...grpc.DialOp // initializeClients creates an AdminClient, AuthServiceClient and IdentityServiceClient with a shared Admin connection // for the process. Note that if called with different cfg/dialoptions, it will not refresh the connection. -func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, opts ...grpc.DialOption) (*Clientset, error) { +func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, proxyTokenCache cache.TokenCache, opts ...grpc.DialOption) (*Clientset, error) { credentialsFuture := NewPerRPCCredentialsFuture() opts = append(opts, - grpc.WithChainUnaryInterceptor(NewAuthInterceptor(cfg, tokenCache, credentialsFuture)), + grpc.WithChainUnaryInterceptor(NewAuthInterceptor(cfg, tokenCache, proxyTokenCache, credentialsFuture)), grpc.WithPerRPCCredentials(credentialsFuture)) if cfg.DefaultServiceConfig != "" { opts = append(opts, grpc.WithDefaultServiceConfig(cfg.DefaultServiceConfig)) } - adminConnection, err := NewAdminConnection(ctx, cfg, opts...) + adminConnection, err := NewAdminConnection(ctx, cfg, proxyTokenCache, opts...) if err != nil { logger.Panicf(ctx, "failed to initialized Admin connection. Err: %s", err.Error()) } @@ -204,7 +204,7 @@ func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenC // Deprecated: Please use NewClientsetBuilder() instead. func InitializeAdminClientFromConfig(ctx context.Context, tokenCache cache.TokenCache, opts ...grpc.DialOption) (service.AdminServiceClient, error) { - clientSet, err := initializeClients(ctx, GetConfig(ctx), tokenCache, opts...) + clientSet, err := initializeClients(ctx, GetConfig(ctx), tokenCache, nil, opts...) if err != nil { return nil, err } diff --git a/clients/go/admin/client_builder.go b/clients/go/admin/client_builder.go index a16480739..d5ee75a6c 100644 --- a/clients/go/admin/client_builder.go +++ b/clients/go/admin/client_builder.go @@ -10,9 +10,10 @@ import ( // ClientsetBuilder is used to build the clientset. This allows custom token cache implementations to be plugged in. type ClientsetBuilder struct { - config *Config - tokenCache cache.TokenCache - opts []grpc.DialOption + config *Config + tokenCache cache.TokenCache + proxyTokenCache cache.TokenCache + opts []grpc.DialOption } // ClientSetBuilder is constructor function to be used by the clients in interacting with the builder @@ -32,6 +33,13 @@ func (cb *ClientsetBuilder) WithTokenCache(tokenCache cache.TokenCache) *Clients return cb } +// TokenCache is designed to cache a single token. When clients choose to send `"proxy-authorization`" +// headers, we, thus, employ a separate token cache. +func (cb *ClientsetBuilder) WithProxyTokenCache(tokenCache cache.TokenCache) *ClientsetBuilder { + cb.proxyTokenCache = tokenCache + return cb +} + func (cb *ClientsetBuilder) WithDialOptions(opts ...grpc.DialOption) *ClientsetBuilder { cb.opts = opts return cb @@ -42,12 +50,15 @@ func (cb *ClientsetBuilder) Build(ctx context.Context) (*Clientset, error) { if cb.tokenCache == nil { cb.tokenCache = &cache.TokenCacheInMemoryProvider{} } + if cb.proxyTokenCache == nil { + cb.proxyTokenCache = &cache.TokenCacheInMemoryProvider{} + } if cb.config == nil { cb.config = GetConfig(ctx) } - return initializeClients(ctx, cb.config, cb.tokenCache, cb.opts...) + return initializeClients(ctx, cb.config, cb.tokenCache, cb.proxyTokenCache, cb.opts...) } func NewClientsetBuilder() *ClientsetBuilder { diff --git a/clients/go/admin/externalprocess/token.go b/clients/go/admin/externalprocess/token.go new file mode 100644 index 000000000..b262f3e26 --- /dev/null +++ b/clients/go/admin/externalprocess/token.go @@ -0,0 +1,37 @@ +package externalprocess + +import ( + "encoding/base64" + "encoding/json" + "errors" + "strings" + "time" +) + +type jwtClaims struct { + Exp int64 `json:"exp"` +} + +// When we receive a token from an external process, we don't have a key to validate it +// For caching purposes, we still want to know when the token *allegedly* expires. +func GetUnvalidatedTokenExpiration(tokenString string) (time.Time, error) { + parts := strings.Split(tokenString, ".") + if len(parts) != 3 { + return time.Time{}, errors.New("invalid token") + } + + payload, err := base64.RawURLEncoding.DecodeString(parts[1]) + if err != nil { + return time.Time{}, err + } + + var claims jwtClaims + err = json.Unmarshal(payload, &claims) + if err != nil { + return time.Time{}, err + } + + expiry := time.Unix(claims.Exp, 0) + + return expiry, nil +} diff --git a/clients/go/admin/externalprocess/token_test.go b/clients/go/admin/externalprocess/token_test.go new file mode 100644 index 000000000..0b7a077c3 --- /dev/null +++ b/clients/go/admin/externalprocess/token_test.go @@ -0,0 +1,24 @@ +package externalprocess + +import ( + "encoding/base64" + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestGetUnvalidatedTokenExpiration(t *testing.T) { + + exp := time.Now().Unix() + claims := jwtClaims{Exp: exp} + payload, _ := json.Marshal(claims) + token := "header." + base64.RawURLEncoding.EncodeToString(payload) + ".signature" + + expiry, err := GetUnvalidatedTokenExpiration(token) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + assert.Equal(t, exp, expiry.Unix()) +} diff --git a/clients/go/admin/token_source_provider.go b/clients/go/admin/token_source_provider.go index 541f8feb2..695273b33 100644 --- a/clients/go/admin/token_source_provider.go +++ b/clients/go/admin/token_source_provider.go @@ -8,6 +8,7 @@ import ( "os" "strings" "sync" + "time" "golang.org/x/oauth2" "golang.org/x/oauth2/clientcredentials" @@ -86,7 +87,7 @@ func NewTokenSourceProvider(ctx context.Context, cfg *Config, tokenCache cache.T return nil, err } case AuthTypeExternalCommand: - tokenProvider, err = NewExternalTokenSourceProvider(cfg.Command) + tokenProvider, err = NewExternalTokenSourceProvider(cfg.Command, tokenCache) if err != nil { return nil, err } @@ -107,24 +108,51 @@ func NewTokenSourceProvider(ctx context.Context, cfg *Config, tokenCache cache.T return tokenProvider, nil } -type ExternalTokenSourceProvider struct { +type ExternalTokenSource struct { command []string } -func NewExternalTokenSourceProvider(command []string) (TokenSourceProvider, error) { - return &ExternalTokenSourceProvider{command: command}, nil +func NewExternalTokenSource(command []string) oauth2.TokenSource { + return &ExternalTokenSource{command: command} } -func (e ExternalTokenSourceProvider) GetTokenSource(ctx context.Context) (oauth2.TokenSource, error) { +func (e *ExternalTokenSource) Token() (*oauth2.Token, error) { output, err := externalprocess.Execute(e.command) if err != nil { return nil, err } - - return oauth2.StaticTokenSource(&oauth2.Token{ - AccessToken: strings.Trim(string(output), "\t \n"), + token := strings.Trim(string(output), "\t \n") + exp, err := externalprocess.GetUnvalidatedTokenExpiration(token) + if err != nil { + // If we cannot extract an expiration, as a precaution, we do not + // want to cache the token as otherwise the external command would + // never be called again. Note that `exp = time.Time{}` would cause + // the token to be considered valid forever. + exp = time.Unix(0, 0) + } + return &oauth2.Token{ + AccessToken: token, TokenType: "bearer", - }), nil + Expiry: exp, + }, nil +} + +type ExternalTokenSourceProvider struct { + command []string + tokenCache cache.TokenCache +} + +func NewExternalTokenSourceProvider(command []string, tokenCache cache.TokenCache) (TokenSourceProvider, error) { + return &ExternalTokenSourceProvider{command: command, tokenCache: tokenCache}, nil +} + +func (e *ExternalTokenSourceProvider) GetTokenSource(ctx context.Context) (oauth2.TokenSource, error) { + return &customTokenSource{ + ctx: ctx, + new: NewExternalTokenSource(e.command), + mu: sync.Mutex{}, + tokenCache: e.tokenCache, + }, nil } type PKCETokenSourceProvider struct { @@ -151,8 +179,6 @@ func GetPKCEAuthTokenSource(ctx context.Context, pkceTokenOrchestrator pkce.Toke logger.Warnf(ctx, "Failed fetching from cache. Will restart the flow. Error: %v", err) } - authToken = nil - if authToken == nil { // Fetch using auth flow if authToken, err = pkceTokenOrchestrator.FetchTokenFromAuthFlow(ctx); err != nil { diff --git a/go.mod b/go.mod index 3aacab796..673b0c7f1 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/antihax/optional v1.0.0 github.com/flyteorg/flytestdlib v1.0.0 github.com/go-test/deep v1.0.7 + github.com/golang-jwt/jwt v3.2.2+incompatible github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/protobuf v1.4.3 github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 diff --git a/go.sum b/go.sum index b2eab61f1..d22db3d61 100644 --- a/go.sum +++ b/go.sum @@ -205,6 +205,8 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= +github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= From cd5cc578d69916d28960d6e22a86cb554269bf23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Mon, 18 Sep 2023 23:47:49 +0200 Subject: [PATCH 05/16] Make tests pass MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor_test.go | 12 ++++++------ clients/go/admin/client_test.go | 6 +++--- clients/go/admin/pkce/handle_app_call_back_test.go | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/clients/go/admin/auth_interceptor_test.go b/clients/go/admin/auth_interceptor_test.go index d5e07a713..56058189f 100644 --- a/clients/go/admin/auth_interceptor_test.go +++ b/clients/go/admin/auth_interceptor_test.go @@ -114,7 +114,7 @@ func newAuthMetadataServer(t testing.TB, port int, impl service.AuthMetadataServ func Test_newAuthInterceptor(t *testing.T) { t.Run("Other Error", func(t *testing.T) { f := NewPerRPCCredentialsFuture() - interceptor := NewAuthInterceptor(&Config{}, &mocks.TokenCache{}, f) + interceptor := NewAuthInterceptor(&Config{}, &mocks.TokenCache{}, &mocks.TokenCache{}, f) otherError := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return status.New(codes.Canceled, "").Err() } @@ -150,7 +150,7 @@ func Test_newAuthInterceptor(t *testing.T) { Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - }, &mocks.TokenCache{}, f) + }, &mocks.TokenCache{}, &mocks.TokenCache{}, f) unauthenticated := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return status.New(codes.Unauthenticated, "").Err() } @@ -181,7 +181,7 @@ func Test_newAuthInterceptor(t *testing.T) { Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - }, &mocks.TokenCache{}, f) + }, &mocks.TokenCache{}, &mocks.TokenCache{}, f) authenticated := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return nil } @@ -220,7 +220,7 @@ func Test_newAuthInterceptor(t *testing.T) { Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - }, &mocks.TokenCache{}, f) + }, &mocks.TokenCache{}, &mocks.TokenCache{}, f) unauthenticated := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return status.New(codes.Aborted, "").Err() } @@ -254,7 +254,7 @@ func TestMaterializeCredentials(t *testing.T) { Scopes: []string{"all"}, Audience: "http://localhost:30081", AuthorizationHeader: "authorization", - }, &mocks.TokenCache{}, f) + }, &mocks.TokenCache{}, &mocks.TokenCache{}, f) assert.NoError(t, err) }) t.Run("Failed to fetch client metadata", func(t *testing.T) { @@ -277,7 +277,7 @@ func TestMaterializeCredentials(t *testing.T) { AuthType: AuthTypeClientSecret, TokenURL: fmt.Sprintf("http://localhost:%d/api/v1/token", port), Scopes: []string{"all"}, - }, &mocks.TokenCache{}, f) + }, &mocks.TokenCache{}, &mocks.TokenCache{}, f) assert.EqualError(t, err, "failed to fetch client metadata. Error: rpc error: code = Unknown desc = expected err") }) } diff --git a/clients/go/admin/client_test.go b/clients/go/admin/client_test.go index 017f4e8ff..137ffed1f 100644 --- a/clients/go/admin/client_test.go +++ b/clients/go/admin/client_test.go @@ -77,7 +77,7 @@ func TestGetAdditionalAdminClientConfigOptions(t *testing.T) { }) t.Run("legal-from-config", func(t *testing.T) { - clientSet, err := initializeClients(ctx, &Config{InsecureSkipVerify: true}, nil) + clientSet, err := initializeClients(ctx, &Config{InsecureSkipVerify: true}, nil, nil) assert.NoError(t, err) assert.NotNil(t, clientSet) assert.NotNil(t, clientSet.AuthMetadataClient()) @@ -85,7 +85,7 @@ func TestGetAdditionalAdminClientConfigOptions(t *testing.T) { assert.NotNil(t, clientSet.HealthServiceClient()) }) t.Run("legal-from-config-with-cacerts", func(t *testing.T) { - clientSet, err := initializeClients(ctx, &Config{CACertFilePath: "testdata/root.pem"}, nil) + clientSet, err := initializeClients(ctx, &Config{CACertFilePath: "testdata/root.pem"}, nil, nil) assert.NoError(t, err) assert.NotNil(t, clientSet) assert.NotNil(t, clientSet.AuthMetadataClient()) @@ -105,7 +105,7 @@ func TestGetAdditionalAdminClientConfigOptions(t *testing.T) { } assert.NoError(t, SetConfig(newAdminServiceConfig)) - clientSet, err := initializeClients(ctx, newAdminServiceConfig, nil) + clientSet, err := initializeClients(ctx, newAdminServiceConfig, nil, nil) assert.NotNil(t, err) assert.Nil(t, clientSet) }) diff --git a/clients/go/admin/pkce/handle_app_call_back_test.go b/clients/go/admin/pkce/handle_app_call_back_test.go index 91eed672c..fbad6b5d3 100644 --- a/clients/go/admin/pkce/handle_app_call_back_test.go +++ b/clients/go/admin/pkce/handle_app_call_back_test.go @@ -25,7 +25,7 @@ func HandleAppCallBackSetup(t *testing.T, state string) (tokenChannel chan *oaut errorChannel = make(chan error, 1) tokenChannel = make(chan *oauth2.Token) testAuthConfig = &oauth.Config{Config: &oauth2.Config{}, DeviceEndpoint: "dummyDeviceEndpoint"} - callBackFn = getAuthServerCallbackHandler(testAuthConfig, "", tokenChannel, errorChannel, state) + callBackFn = getAuthServerCallbackHandler(testAuthConfig, "", tokenChannel, errorChannel, state, &http.Client{}) assert.NotNil(t, callBackFn) req = &http.Request{ Method: http.MethodGet, From 27b8e75bccb369cf3e3cc833771c335c30dcfb6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Mon, 18 Sep 2023 23:48:07 +0200 Subject: [PATCH 06/16] Add tests for proxy auth interceptor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor_test.go | 100 ++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/clients/go/admin/auth_interceptor_test.go b/clients/go/admin/auth_interceptor_test.go index 56058189f..9579439fe 100644 --- a/clients/go/admin/auth_interceptor_test.go +++ b/clients/go/admin/auth_interceptor_test.go @@ -15,11 +15,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "golang.org/x/oauth2" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/rand" + "github.com/flyteorg/flyteidl/clients/go/admin/cache" "github.com/flyteorg/flyteidl/clients/go/admin/cache/mocks" adminMocks "github.com/flyteorg/flyteidl/clients/go/admin/mocks" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" @@ -281,3 +284,100 @@ func TestMaterializeCredentials(t *testing.T) { assert.EqualError(t, err, "failed to fetch client metadata. Error: rpc error: code = Unknown desc = expected err") }) } + +func TestNewProxyAuthInterceptor(t *testing.T) { + cfg := &Config{ + ProxyCommand: []string{"echo", "test-token"}, + } + tokenCache := &cache.TokenCacheInMemoryProvider{} + + interceptor := NewProxyAuthInterceptor(cfg, tokenCache) + + ctx := context.Background() + method := "/test.method" + req := "request" + reply := "reply" + cc := new(grpc.ClientConn) + + testInvoker := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + md, _ := metadata.FromOutgoingContext(ctx) + assert.Equal(t, []string{"Bearer test-token"}, md.Get(ProxyAuthorizationHeader)) + return nil + } + + err := interceptor(ctx, method, req, reply, cc, testInvoker) + + assert.NoError(t, err) +} + +type testRoundTripper struct { + RoundTripFunc func(req *http.Request) (*http.Response, error) +} + +func (t *testRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return t.RoundTripFunc(req) +} + +func TestSetHTTPClientContext(t *testing.T) { + ctx := context.Background() + tokenCache := &cache.TokenCacheInMemoryProvider{} + + t.Run("no proxy command and no proxy url", func(t *testing.T) { + cfg := &Config{} + newCtx, err := setHTTPClientContext(ctx, cfg, tokenCache) + assert.NoError(t, err) + + httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) + assert.True(t, ok) + + transport, ok := httpClient.Transport.(*http.Transport) + assert.True(t, ok) + assert.Nil(t, transport.Proxy) + }) + + t.Run("proxy url", func(t *testing.T) { + cfg := &Config{ + HTTPProxyURL: config. + URL{URL: url.URL{ + Scheme: "http", + Host: "localhost:8080", + }}, + } + newCtx, err := setHTTPClientContext(ctx, cfg, tokenCache) + assert.NoError(t, err) + + httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) + assert.True(t, ok) + + transport, ok := httpClient.Transport.(*http.Transport) + assert.True(t, ok) + assert.NotNil(t, transport.Proxy) + }) + + t.Run("proxy command adds proxy-authorization header", func(t *testing.T) { + cfg := &Config{ + ProxyCommand: []string{"echo", "test-token-http-client"}, + } + newCtx, err := setHTTPClientContext(ctx, cfg, tokenCache) + assert.NoError(t, err) + + httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) + assert.True(t, ok) + + pat, ok := httpClient.Transport.(*proxyAuthTransport) + assert.True(t, ok) + + testRoundTripper := &testRoundTripper{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { + // Check if the ProxyAuthorizationHeader is correctly set + assert.Equal(t, "Bearer test-token-http-client", req.Header.Get(ProxyAuthorizationHeader)) + return &http.Response{StatusCode: http.StatusOK}, nil + }, + } + pat.transport = testRoundTripper + + req, _ := http.NewRequest("GET", "http://example.com", nil) + _, err = httpClient.Do(req) + assert.NoError(t, err) + }) +} From ea9090e43846414c359364f083833cf62a8400b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Wed, 20 Sep 2023 00:19:11 +0200 Subject: [PATCH 07/16] Make work without 2nd token cache but instead with 2nd credentials future MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor.go | 67 ++++++++++--------- clients/go/admin/client.go | 21 +++--- clients/go/admin/client_builder.go | 19 ++---- clients/go/admin/externalprocess/token.go | 37 ---------- .../go/admin/externalprocess/token_test.go | 24 ------- clients/go/admin/token_source_provider.go | 46 +++---------- 6 files changed, 58 insertions(+), 156 deletions(-) delete mode 100644 clients/go/admin/externalprocess/token.go delete mode 100644 clients/go/admin/externalprocess/token_test.go diff --git a/clients/go/admin/auth_interceptor.go b/clients/go/admin/auth_interceptor.go index 949468fbc..a36ca98bb 100644 --- a/clients/go/admin/auth_interceptor.go +++ b/clients/go/admin/auth_interceptor.go @@ -2,6 +2,7 @@ package admin import ( "context" + "errors" "fmt" "net/http" @@ -11,7 +12,6 @@ import ( "golang.org/x/oauth2" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/grpc" @@ -21,8 +21,8 @@ const ProxyAuthorizationHeader = "proxy-authorization" // MaterializeCredentials will attempt to build a TokenSource given the anonymously available information exposed by the server. // Once established, it'll invoke PerRPCCredentialsFuture.Store() on perRPCCredentials to populate it with the appropriate values. -func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, proxyTokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture) error { - authMetadataClient, err := InitializeAuthMetadataClient(ctx, cfg, proxyTokenCache) +func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error { + authMetadataClient, err := InitializeAuthMetadataClient(ctx, cfg, proxyCredentialsFuture) if err != nil { return fmt.Errorf("failed to initialized Auth Metadata Client. Error: %w", err) } @@ -51,8 +51,8 @@ func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.T return nil } -func GetProxyTokenSource(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache) (oauth2.TokenSource, error) { - tokenSourceProvider, err := NewExternalTokenSourceProvider(cfg.ProxyCommand, proxyTokenCache) +func GetProxyTokenSource(ctx context.Context, cfg *Config) (oauth2.TokenSource, error) { + tokenSourceProvider, err := NewExternalTokenSourceProvider(cfg.ProxyCommand) if err != nil { return nil, fmt.Errorf("failed to initialized proxy authorization token source provider. Err: %w", err) } @@ -63,18 +63,14 @@ func GetProxyTokenSource(ctx context.Context, cfg *Config, proxyTokenCache cache return proxyTokenSource, nil } -func MaterializeProxyAuthCredentials(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache) (context.Context, error) { - proxyTokenSource, err := GetProxyTokenSource(ctx, cfg, proxyTokenCache) +func MaterializeProxyAuthCredentials(ctx context.Context, cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture) (context.Context, error) { + proxyTokenSource, err := GetProxyTokenSource(ctx, cfg) if err != nil { return nil, err } - token, err := proxyTokenSource.Token() - if err != nil { - return nil, err - } - md := metadata.Pairs(ProxyAuthorizationHeader, "Bearer "+token.AccessToken) - ctx = metadata.NewOutgoingContext(ctx, md) + wrappedTokenSource := NewCustomHeaderTokenSource(proxyTokenSource, cfg.UseInsecureConnection, ProxyAuthorizationHeader) + proxyCredentialsFuture.Store(wrappedTokenSource) return ctx, nil } @@ -84,21 +80,27 @@ func shouldAttemptToAuthenticate(errorCode codes.Code) bool { } type proxyAuthTransport struct { - transport http.RoundTripper - tokenSource oauth2.TokenSource + transport http.RoundTripper + proxyCredentialsFuture *PerRPCCredentialsFuture } func (c *proxyAuthTransport) RoundTrip(req *http.Request) (*http.Response, error) { - token, err := c.tokenSource.Token() + // check if the proxy credentials future is initialized + if !c.proxyCredentialsFuture.IsInitialized() { + return nil, errors.New("proxy credentials future is not initialized") + } + + metadata, err := c.proxyCredentialsFuture.GetRequestMetadata(context.Background(), "") if err != nil { return nil, err } - req.Header.Add(ProxyAuthorizationHeader, "Bearer "+token.AccessToken) + token := metadata[ProxyAuthorizationHeader] + req.Header.Add(ProxyAuthorizationHeader, token) return c.transport.RoundTrip(req) } // Set up http client used in oauth2 -func setHTTPClientContext(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache) (context.Context, error) { +func setHTTPClientContext(ctx context.Context, cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture) (context.Context, error) { httpClient := &http.Client{} transport := &http.Transport{} @@ -108,14 +110,9 @@ func setHTTPClientContext(ctx context.Context, cfg *Config, proxyTokenCache cach } if cfg.ProxyCommand != nil { - proxyTokenSource, err := GetProxyTokenSource(ctx, cfg, proxyTokenCache) - if err != nil { - return nil, err - } - httpClient.Transport = &proxyAuthTransport{ - transport: transport, - tokenSource: proxyTokenSource, + transport: transport, + proxyCredentialsFuture: proxyCredentialsFuture, } } else { httpClient.Transport = transport @@ -134,13 +131,12 @@ func setHTTPClientContext(ctx context.Context, cfg *Config, proxyTokenCache cach // more. It'll fail hard if it couldn't do so (i.e. it will no longer attempt to send an unauthenticated request). Once // a token source has been created, it'll invoke the grpc pipeline again, this time the grpc.PerRPCCredentials should // be able to find and acquire a valid AccessToken to annotate the request with. -func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, proxyTokenCache cache.TokenCache, credentialsFuture *PerRPCCredentialsFuture) grpc.UnaryClientInterceptor { +func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFuture *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - ctx, err := setHTTPClientContext(ctx, cfg, proxyTokenCache) + ctx, err := setHTTPClientContext(ctx, cfg, proxyCredentialsFuture) if err != nil { return err } - err = invoker(ctx, method, req, reply, cc, opts...) if err != nil { logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err) @@ -149,7 +145,7 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, proxyTokenCach // If the error we receive from executing the request expects if shouldAttemptToAuthenticate(st.Code()) { logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code()) - newErr := MaterializeCredentials(ctx, cfg, tokenCache, proxyTokenCache, credentialsFuture) + newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture) if newErr != nil { return fmt.Errorf("authentication error! Original Error: %v, Auth Error: %w", err, newErr) } @@ -163,12 +159,17 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, proxyTokenCach } } -func NewProxyAuthInterceptor(cfg *Config, proxyTokenCache cache.TokenCache) grpc.UnaryClientInterceptor { +func NewProxyAuthInterceptor(cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - ctx, err := MaterializeProxyAuthCredentials(ctx, cfg, proxyTokenCache) + + err := invoker(ctx, method, req, reply, cc, opts...) if err != nil { - return fmt.Errorf("proxy authorization error! Original Error: %v", err) + ctx, err := MaterializeProxyAuthCredentials(ctx, cfg, proxyCredentialsFuture) + if err != nil { + return fmt.Errorf("proxy authorization error! Original Error: %v", err) + } + return invoker(ctx, method, req, reply, cc, opts...) } - return invoker(ctx, method, req, reply, cc, opts...) + return err } } diff --git a/clients/go/admin/client.go b/clients/go/admin/client.go index 2dc94980d..ac0d63cc8 100644 --- a/clients/go/admin/client.go +++ b/clients/go/admin/client.go @@ -110,9 +110,9 @@ func getAuthenticationDialOption(ctx context.Context, cfg *Config, tokenSourcePr } // InitializeAuthMetadataClient creates a new anonymously Auth Metadata Service client. -func InitializeAuthMetadataClient(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache) (client service.AuthMetadataServiceClient, err error) { +func InitializeAuthMetadataClient(ctx context.Context, cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture) (client service.AuthMetadataServiceClient, err error) { // Create an unauthenticated connection to fetch AuthMetadata - authMetadataConnection, err := NewAdminConnection(ctx, cfg, proxyTokenCache) + authMetadataConnection, err := NewAdminConnection(ctx, cfg, proxyCredentialsFuture) if err != nil { return nil, fmt.Errorf("failed to initialized admin connection. Error: %w", err) } @@ -120,7 +120,7 @@ func InitializeAuthMetadataClient(ctx context.Context, cfg *Config, proxyTokenCa return service.NewAuthMetadataServiceClient(authMetadataConnection), nil } -func NewAdminConnection(ctx context.Context, cfg *Config, proxyTokenCache cache.TokenCache, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func NewAdminConnection(ctx context.Context, cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture, opts ...grpc.DialOption) (*grpc.ClientConn, error) { if opts == nil { // Initialize opts list to the potential number of options we will add. Initialization optimizes memory // allocation. @@ -153,9 +153,9 @@ func NewAdminConnection(ctx context.Context, cfg *Config, proxyTokenCache cache. opts = append(opts, GetAdditionalAdminClientConfigOptions(cfg)...) - // Ensure proxy auth interceptor is invoked prior to auth interceptor if cfg.ProxyCommand != nil { - opts = append([]grpc.DialOption{grpc.WithChainUnaryInterceptor(NewProxyAuthInterceptor(cfg, proxyTokenCache))}, opts...) + opts = append(opts, grpc.WithChainUnaryInterceptor(NewProxyAuthInterceptor(cfg, proxyCredentialsFuture))) + opts = append(opts, grpc.WithPerRPCCredentials(proxyCredentialsFuture)) } return grpc.Dial(cfg.Endpoint.String(), opts...) @@ -164,7 +164,7 @@ func NewAdminConnection(ctx context.Context, cfg *Config, proxyTokenCache cache. // InitializeAdminClient creates an AdminClient with a shared Admin connection for the process // Deprecated: Please use initializeClients instead. func InitializeAdminClient(ctx context.Context, cfg *Config, opts ...grpc.DialOption) service.AdminServiceClient { - set, err := initializeClients(ctx, cfg, nil, nil, opts...) + set, err := initializeClients(ctx, cfg, nil, opts...) if err != nil { logger.Panicf(ctx, "Failed to initialized client. Error: %v", err) return nil @@ -175,18 +175,19 @@ func InitializeAdminClient(ctx context.Context, cfg *Config, opts ...grpc.DialOp // initializeClients creates an AdminClient, AuthServiceClient and IdentityServiceClient with a shared Admin connection // for the process. Note that if called with different cfg/dialoptions, it will not refresh the connection. -func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, proxyTokenCache cache.TokenCache, opts ...grpc.DialOption) (*Clientset, error) { +func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, opts ...grpc.DialOption) (*Clientset, error) { credentialsFuture := NewPerRPCCredentialsFuture() + proxyCredentialsFuture := NewPerRPCCredentialsFuture() opts = append(opts, - grpc.WithChainUnaryInterceptor(NewAuthInterceptor(cfg, tokenCache, proxyTokenCache, credentialsFuture)), + grpc.WithChainUnaryInterceptor(NewAuthInterceptor(cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)), grpc.WithPerRPCCredentials(credentialsFuture)) if cfg.DefaultServiceConfig != "" { opts = append(opts, grpc.WithDefaultServiceConfig(cfg.DefaultServiceConfig)) } - adminConnection, err := NewAdminConnection(ctx, cfg, proxyTokenCache, opts...) + adminConnection, err := NewAdminConnection(ctx, cfg, proxyCredentialsFuture, opts...) if err != nil { logger.Panicf(ctx, "failed to initialized Admin connection. Err: %s", err.Error()) } @@ -204,7 +205,7 @@ func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenC // Deprecated: Please use NewClientsetBuilder() instead. func InitializeAdminClientFromConfig(ctx context.Context, tokenCache cache.TokenCache, opts ...grpc.DialOption) (service.AdminServiceClient, error) { - clientSet, err := initializeClients(ctx, GetConfig(ctx), tokenCache, nil, opts...) + clientSet, err := initializeClients(ctx, GetConfig(ctx), tokenCache, opts...) if err != nil { return nil, err } diff --git a/clients/go/admin/client_builder.go b/clients/go/admin/client_builder.go index d5ee75a6c..a16480739 100644 --- a/clients/go/admin/client_builder.go +++ b/clients/go/admin/client_builder.go @@ -10,10 +10,9 @@ import ( // ClientsetBuilder is used to build the clientset. This allows custom token cache implementations to be plugged in. type ClientsetBuilder struct { - config *Config - tokenCache cache.TokenCache - proxyTokenCache cache.TokenCache - opts []grpc.DialOption + config *Config + tokenCache cache.TokenCache + opts []grpc.DialOption } // ClientSetBuilder is constructor function to be used by the clients in interacting with the builder @@ -33,13 +32,6 @@ func (cb *ClientsetBuilder) WithTokenCache(tokenCache cache.TokenCache) *Clients return cb } -// TokenCache is designed to cache a single token. When clients choose to send `"proxy-authorization`" -// headers, we, thus, employ a separate token cache. -func (cb *ClientsetBuilder) WithProxyTokenCache(tokenCache cache.TokenCache) *ClientsetBuilder { - cb.proxyTokenCache = tokenCache - return cb -} - func (cb *ClientsetBuilder) WithDialOptions(opts ...grpc.DialOption) *ClientsetBuilder { cb.opts = opts return cb @@ -50,15 +42,12 @@ func (cb *ClientsetBuilder) Build(ctx context.Context) (*Clientset, error) { if cb.tokenCache == nil { cb.tokenCache = &cache.TokenCacheInMemoryProvider{} } - if cb.proxyTokenCache == nil { - cb.proxyTokenCache = &cache.TokenCacheInMemoryProvider{} - } if cb.config == nil { cb.config = GetConfig(ctx) } - return initializeClients(ctx, cb.config, cb.tokenCache, cb.proxyTokenCache, cb.opts...) + return initializeClients(ctx, cb.config, cb.tokenCache, cb.opts...) } func NewClientsetBuilder() *ClientsetBuilder { diff --git a/clients/go/admin/externalprocess/token.go b/clients/go/admin/externalprocess/token.go deleted file mode 100644 index b262f3e26..000000000 --- a/clients/go/admin/externalprocess/token.go +++ /dev/null @@ -1,37 +0,0 @@ -package externalprocess - -import ( - "encoding/base64" - "encoding/json" - "errors" - "strings" - "time" -) - -type jwtClaims struct { - Exp int64 `json:"exp"` -} - -// When we receive a token from an external process, we don't have a key to validate it -// For caching purposes, we still want to know when the token *allegedly* expires. -func GetUnvalidatedTokenExpiration(tokenString string) (time.Time, error) { - parts := strings.Split(tokenString, ".") - if len(parts) != 3 { - return time.Time{}, errors.New("invalid token") - } - - payload, err := base64.RawURLEncoding.DecodeString(parts[1]) - if err != nil { - return time.Time{}, err - } - - var claims jwtClaims - err = json.Unmarshal(payload, &claims) - if err != nil { - return time.Time{}, err - } - - expiry := time.Unix(claims.Exp, 0) - - return expiry, nil -} diff --git a/clients/go/admin/externalprocess/token_test.go b/clients/go/admin/externalprocess/token_test.go deleted file mode 100644 index 0b7a077c3..000000000 --- a/clients/go/admin/externalprocess/token_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package externalprocess - -import ( - "encoding/base64" - "encoding/json" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestGetUnvalidatedTokenExpiration(t *testing.T) { - - exp := time.Now().Unix() - claims := jwtClaims{Exp: exp} - payload, _ := json.Marshal(claims) - token := "header." + base64.RawURLEncoding.EncodeToString(payload) + ".signature" - - expiry, err := GetUnvalidatedTokenExpiration(token) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - assert.Equal(t, exp, expiry.Unix()) -} diff --git a/clients/go/admin/token_source_provider.go b/clients/go/admin/token_source_provider.go index 695273b33..e836bf81a 100644 --- a/clients/go/admin/token_source_provider.go +++ b/clients/go/admin/token_source_provider.go @@ -8,7 +8,6 @@ import ( "os" "strings" "sync" - "time" "golang.org/x/oauth2" "golang.org/x/oauth2/clientcredentials" @@ -87,7 +86,7 @@ func NewTokenSourceProvider(ctx context.Context, cfg *Config, tokenCache cache.T return nil, err } case AuthTypeExternalCommand: - tokenProvider, err = NewExternalTokenSourceProvider(cfg.Command, tokenCache) + tokenProvider, err = NewExternalTokenSourceProvider(cfg.Command) if err != nil { return nil, err } @@ -108,51 +107,24 @@ func NewTokenSourceProvider(ctx context.Context, cfg *Config, tokenCache cache.T return tokenProvider, nil } -type ExternalTokenSource struct { +type ExternalTokenSourceProvider struct { command []string } -func NewExternalTokenSource(command []string) oauth2.TokenSource { - return &ExternalTokenSource{command: command} +func NewExternalTokenSourceProvider(command []string) (TokenSourceProvider, error) { + return &ExternalTokenSourceProvider{command: command}, nil } -func (e *ExternalTokenSource) Token() (*oauth2.Token, error) { +func (e ExternalTokenSourceProvider) GetTokenSource(ctx context.Context) (oauth2.TokenSource, error) { output, err := externalprocess.Execute(e.command) if err != nil { return nil, err } - token := strings.Trim(string(output), "\t \n") - exp, err := externalprocess.GetUnvalidatedTokenExpiration(token) - if err != nil { - // If we cannot extract an expiration, as a precaution, we do not - // want to cache the token as otherwise the external command would - // never be called again. Note that `exp = time.Time{}` would cause - // the token to be considered valid forever. - exp = time.Unix(0, 0) - } - return &oauth2.Token{ - AccessToken: token, - TokenType: "bearer", - Expiry: exp, - }, nil -} -type ExternalTokenSourceProvider struct { - command []string - tokenCache cache.TokenCache -} - -func NewExternalTokenSourceProvider(command []string, tokenCache cache.TokenCache) (TokenSourceProvider, error) { - return &ExternalTokenSourceProvider{command: command, tokenCache: tokenCache}, nil -} - -func (e *ExternalTokenSourceProvider) GetTokenSource(ctx context.Context) (oauth2.TokenSource, error) { - return &customTokenSource{ - ctx: ctx, - new: NewExternalTokenSource(e.command), - mu: sync.Mutex{}, - tokenCache: e.tokenCache, - }, nil + return oauth2.StaticTokenSource(&oauth2.Token{ + AccessToken: strings.Trim(string(output), "\t \n"), + TokenType: "bearer", + }), nil } type PKCETokenSourceProvider struct { From 7bce3d98c254283be63a8e1e5ba851e9f3c476ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Fri, 22 Sep 2023 18:27:05 +0200 Subject: [PATCH 08/16] Adapt existing tests to not using a 2nd token cache but a 2nd credentials future MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor.go | 8 +- clients/go/admin/auth_interceptor_test.go | 193 ++++++++++------------ clients/go/admin/client_test.go | 6 +- 3 files changed, 95 insertions(+), 112 deletions(-) diff --git a/clients/go/admin/auth_interceptor.go b/clients/go/admin/auth_interceptor.go index a36ca98bb..aa40860ed 100644 --- a/clients/go/admin/auth_interceptor.go +++ b/clients/go/admin/auth_interceptor.go @@ -63,16 +63,16 @@ func GetProxyTokenSource(ctx context.Context, cfg *Config) (oauth2.TokenSource, return proxyTokenSource, nil } -func MaterializeProxyAuthCredentials(ctx context.Context, cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture) (context.Context, error) { +func MaterializeProxyAuthCredentials(ctx context.Context, cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture) error { proxyTokenSource, err := GetProxyTokenSource(ctx, cfg) if err != nil { - return nil, err + return err } wrappedTokenSource := NewCustomHeaderTokenSource(proxyTokenSource, cfg.UseInsecureConnection, ProxyAuthorizationHeader) proxyCredentialsFuture.Store(wrappedTokenSource) - return ctx, nil + return nil } func shouldAttemptToAuthenticate(errorCode codes.Code) bool { @@ -164,7 +164,7 @@ func NewProxyAuthInterceptor(cfg *Config, proxyCredentialsFuture *PerRPCCredenti err := invoker(ctx, method, req, reply, cc, opts...) if err != nil { - ctx, err := MaterializeProxyAuthCredentials(ctx, cfg, proxyCredentialsFuture) + err := MaterializeProxyAuthCredentials(ctx, cfg, proxyCredentialsFuture) if err != nil { return fmt.Errorf("proxy authorization error! Original Error: %v", err) } diff --git a/clients/go/admin/auth_interceptor_test.go b/clients/go/admin/auth_interceptor_test.go index 9579439fe..49cbaf73f 100644 --- a/clients/go/admin/auth_interceptor_test.go +++ b/clients/go/admin/auth_interceptor_test.go @@ -15,14 +15,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "golang.org/x/oauth2" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/rand" - "github.com/flyteorg/flyteidl/clients/go/admin/cache" "github.com/flyteorg/flyteidl/clients/go/admin/cache/mocks" adminMocks "github.com/flyteorg/flyteidl/clients/go/admin/mocks" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" @@ -117,7 +114,8 @@ func newAuthMetadataServer(t testing.TB, port int, impl service.AuthMetadataServ func Test_newAuthInterceptor(t *testing.T) { t.Run("Other Error", func(t *testing.T) { f := NewPerRPCCredentialsFuture() - interceptor := NewAuthInterceptor(&Config{}, &mocks.TokenCache{}, &mocks.TokenCache{}, f) + p := NewPerRPCCredentialsFuture() + interceptor := NewAuthInterceptor(&Config{}, &mocks.TokenCache{}, f, p) otherError := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return status.New(codes.Canceled, "").Err() } @@ -149,11 +147,12 @@ func Test_newAuthInterceptor(t *testing.T) { assert.NoError(t, err) f := NewPerRPCCredentialsFuture() + p := NewPerRPCCredentialsFuture() interceptor := NewAuthInterceptor(&Config{ Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - }, &mocks.TokenCache{}, &mocks.TokenCache{}, f) + }, &mocks.TokenCache{}, f, p) unauthenticated := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return status.New(codes.Unauthenticated, "").Err() } @@ -180,11 +179,13 @@ func Test_newAuthInterceptor(t *testing.T) { assert.NoError(t, err) f := NewPerRPCCredentialsFuture() + p := NewPerRPCCredentialsFuture() + interceptor := NewAuthInterceptor(&Config{ Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - }, &mocks.TokenCache{}, &mocks.TokenCache{}, f) + }, &mocks.TokenCache{}, f, p) authenticated := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return nil } @@ -219,11 +220,13 @@ func Test_newAuthInterceptor(t *testing.T) { assert.NoError(t, err) f := NewPerRPCCredentialsFuture() + p := NewPerRPCCredentialsFuture() + interceptor := NewAuthInterceptor(&Config{ Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - }, &mocks.TokenCache{}, &mocks.TokenCache{}, f) + }, &mocks.TokenCache{}, f, p) unauthenticated := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return status.New(codes.Aborted, "").Err() } @@ -249,6 +252,8 @@ func TestMaterializeCredentials(t *testing.T) { assert.NoError(t, err) f := NewPerRPCCredentialsFuture() + p := NewPerRPCCredentialsFuture() + err = MaterializeCredentials(ctx, &Config{ Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, @@ -257,7 +262,7 @@ func TestMaterializeCredentials(t *testing.T) { Scopes: []string{"all"}, Audience: "http://localhost:30081", AuthorizationHeader: "authorization", - }, &mocks.TokenCache{}, &mocks.TokenCache{}, f) + }, &mocks.TokenCache{}, f, p) assert.NoError(t, err) }) t.Run("Failed to fetch client metadata", func(t *testing.T) { @@ -274,110 +279,88 @@ func TestMaterializeCredentials(t *testing.T) { assert.NoError(t, err) f := NewPerRPCCredentialsFuture() + p := NewPerRPCCredentialsFuture() + err = MaterializeCredentials(ctx, &Config{ Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, TokenURL: fmt.Sprintf("http://localhost:%d/api/v1/token", port), Scopes: []string{"all"}, - }, &mocks.TokenCache{}, &mocks.TokenCache{}, f) + }, &mocks.TokenCache{}, f, p) assert.EqualError(t, err, "failed to fetch client metadata. Error: rpc error: code = Unknown desc = expected err") }) } -func TestNewProxyAuthInterceptor(t *testing.T) { - cfg := &Config{ - ProxyCommand: []string{"echo", "test-token"}, - } - tokenCache := &cache.TokenCacheInMemoryProvider{} - - interceptor := NewProxyAuthInterceptor(cfg, tokenCache) - - ctx := context.Background() - method := "/test.method" - req := "request" - reply := "reply" - cc := new(grpc.ClientConn) - - testInvoker := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { - md, _ := metadata.FromOutgoingContext(ctx) - assert.Equal(t, []string{"Bearer test-token"}, md.Get(ProxyAuthorizationHeader)) - return nil - } - - err := interceptor(ctx, method, req, reply, cc, testInvoker) - - assert.NoError(t, err) -} - -type testRoundTripper struct { - RoundTripFunc func(req *http.Request) (*http.Response, error) -} - -func (t *testRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { - return t.RoundTripFunc(req) -} - -func TestSetHTTPClientContext(t *testing.T) { - ctx := context.Background() - tokenCache := &cache.TokenCacheInMemoryProvider{} - - t.Run("no proxy command and no proxy url", func(t *testing.T) { - cfg := &Config{} - newCtx, err := setHTTPClientContext(ctx, cfg, tokenCache) - assert.NoError(t, err) - - httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) - assert.True(t, ok) - - transport, ok := httpClient.Transport.(*http.Transport) - assert.True(t, ok) - assert.Nil(t, transport.Proxy) - }) - - t.Run("proxy url", func(t *testing.T) { - cfg := &Config{ - HTTPProxyURL: config. - URL{URL: url.URL{ - Scheme: "http", - Host: "localhost:8080", - }}, - } - newCtx, err := setHTTPClientContext(ctx, cfg, tokenCache) - assert.NoError(t, err) - - httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) - assert.True(t, ok) - - transport, ok := httpClient.Transport.(*http.Transport) - assert.True(t, ok) - assert.NotNil(t, transport.Proxy) - }) - - t.Run("proxy command adds proxy-authorization header", func(t *testing.T) { - cfg := &Config{ - ProxyCommand: []string{"echo", "test-token-http-client"}, - } - newCtx, err := setHTTPClientContext(ctx, cfg, tokenCache) - assert.NoError(t, err) - - httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) - assert.True(t, ok) - - pat, ok := httpClient.Transport.(*proxyAuthTransport) - assert.True(t, ok) - - testRoundTripper := &testRoundTripper{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { - // Check if the ProxyAuthorizationHeader is correctly set - assert.Equal(t, "Bearer test-token-http-client", req.Header.Get(ProxyAuthorizationHeader)) - return &http.Response{StatusCode: http.StatusOK}, nil - }, - } - pat.transport = testRoundTripper - - req, _ := http.NewRequest("GET", "http://example.com", nil) - _, err = httpClient.Do(req) - assert.NoError(t, err) - }) -} +// type testRoundTripper struct { +// RoundTripFunc func(req *http.Request) (*http.Response, error) +// } + +// func (t *testRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { +// return t.RoundTripFunc(req) +// } + +// func TestSetHTTPClientContext(t *testing.T) { +// ctx := context.Background() +// tokenCache := &cache.TokenCacheInMemoryProvider{} + +// t.Run("no proxy command and no proxy url", func(t *testing.T) { +// cfg := &Config{} + +// newCtx, err := setHTTPClientContext(ctx, cfg, tokenCache) +// assert.NoError(t, err) + +// httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) +// assert.True(t, ok) + +// transport, ok := httpClient.Transport.(*http.Transport) +// assert.True(t, ok) +// assert.Nil(t, transport.Proxy) +// }) + +// t.Run("proxy url", func(t *testing.T) { +// cfg := &Config{ +// HTTPProxyURL: config. +// URL{URL: url.URL{ +// Scheme: "http", +// Host: "localhost:8080", +// }}, +// } +// newCtx, err := setHTTPClientContext(ctx, cfg, tokenCache) +// assert.NoError(t, err) + +// httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) +// assert.True(t, ok) + +// transport, ok := httpClient.Transport.(*http.Transport) +// assert.True(t, ok) +// assert.NotNil(t, transport.Proxy) +// }) + +// t.Run("proxy command adds proxy-authorization header", func(t *testing.T) { +// cfg := &Config{ +// ProxyCommand: []string{"echo", "test-token-http-client"}, +// } +// newCtx, err := setHTTPClientContext(ctx, cfg, tokenCache) +// assert.NoError(t, err) + +// httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) +// assert.True(t, ok) + +// pat, ok := httpClient.Transport.(*proxyAuthTransport) +// assert.True(t, ok) + +// testRoundTripper := &testRoundTripper{ +// RoundTripFunc: func(req *http.Request) (*http.Response, error) { +// // Check if the ProxyAuthorizationHeader is correctly set +// assert.Equal(t, "Bearer test-token-http-client", req.Header.Get(ProxyAuthorizationHeader)) +// return &http.Response{StatusCode: http.StatusOK}, nil +// }, +// } +// pat.transport = testRoundTripper + +// req, _ := http.NewRequest("GET", "http://example.com", nil) +// _, err = httpClient.Do(req) +// assert.NoError(t, err) +// }) +// } diff --git a/clients/go/admin/client_test.go b/clients/go/admin/client_test.go index 137ffed1f..017f4e8ff 100644 --- a/clients/go/admin/client_test.go +++ b/clients/go/admin/client_test.go @@ -77,7 +77,7 @@ func TestGetAdditionalAdminClientConfigOptions(t *testing.T) { }) t.Run("legal-from-config", func(t *testing.T) { - clientSet, err := initializeClients(ctx, &Config{InsecureSkipVerify: true}, nil, nil) + clientSet, err := initializeClients(ctx, &Config{InsecureSkipVerify: true}, nil) assert.NoError(t, err) assert.NotNil(t, clientSet) assert.NotNil(t, clientSet.AuthMetadataClient()) @@ -85,7 +85,7 @@ func TestGetAdditionalAdminClientConfigOptions(t *testing.T) { assert.NotNil(t, clientSet.HealthServiceClient()) }) t.Run("legal-from-config-with-cacerts", func(t *testing.T) { - clientSet, err := initializeClients(ctx, &Config{CACertFilePath: "testdata/root.pem"}, nil, nil) + clientSet, err := initializeClients(ctx, &Config{CACertFilePath: "testdata/root.pem"}, nil) assert.NoError(t, err) assert.NotNil(t, clientSet) assert.NotNil(t, clientSet.AuthMetadataClient()) @@ -105,7 +105,7 @@ func TestGetAdditionalAdminClientConfigOptions(t *testing.T) { } assert.NoError(t, SetConfig(newAdminServiceConfig)) - clientSet, err := initializeClients(ctx, newAdminServiceConfig, nil, nil) + clientSet, err := initializeClients(ctx, newAdminServiceConfig, nil) assert.NotNil(t, err) assert.Nil(t, clientSet) }) From 132dbb9b1a2a867ee6c31306b39289b283aead2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Fri, 22 Sep 2023 18:32:41 +0200 Subject: [PATCH 09/16] Adapt new tests to not using a 2nd token cache but a 2nd credentials future MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor_test.go | 178 +++++++++++++--------- 1 file changed, 106 insertions(+), 72 deletions(-) diff --git a/clients/go/admin/auth_interceptor_test.go b/clients/go/admin/auth_interceptor_test.go index 49cbaf73f..7f43131be 100644 --- a/clients/go/admin/auth_interceptor_test.go +++ b/clients/go/admin/auth_interceptor_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "golang.org/x/oauth2" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -292,75 +293,108 @@ func TestMaterializeCredentials(t *testing.T) { }) } -// type testRoundTripper struct { -// RoundTripFunc func(req *http.Request) (*http.Response, error) -// } - -// func (t *testRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { -// return t.RoundTripFunc(req) -// } - -// func TestSetHTTPClientContext(t *testing.T) { -// ctx := context.Background() -// tokenCache := &cache.TokenCacheInMemoryProvider{} - -// t.Run("no proxy command and no proxy url", func(t *testing.T) { -// cfg := &Config{} - -// newCtx, err := setHTTPClientContext(ctx, cfg, tokenCache) -// assert.NoError(t, err) - -// httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) -// assert.True(t, ok) - -// transport, ok := httpClient.Transport.(*http.Transport) -// assert.True(t, ok) -// assert.Nil(t, transport.Proxy) -// }) - -// t.Run("proxy url", func(t *testing.T) { -// cfg := &Config{ -// HTTPProxyURL: config. -// URL{URL: url.URL{ -// Scheme: "http", -// Host: "localhost:8080", -// }}, -// } -// newCtx, err := setHTTPClientContext(ctx, cfg, tokenCache) -// assert.NoError(t, err) - -// httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) -// assert.True(t, ok) - -// transport, ok := httpClient.Transport.(*http.Transport) -// assert.True(t, ok) -// assert.NotNil(t, transport.Proxy) -// }) - -// t.Run("proxy command adds proxy-authorization header", func(t *testing.T) { -// cfg := &Config{ -// ProxyCommand: []string{"echo", "test-token-http-client"}, -// } -// newCtx, err := setHTTPClientContext(ctx, cfg, tokenCache) -// assert.NoError(t, err) - -// httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) -// assert.True(t, ok) - -// pat, ok := httpClient.Transport.(*proxyAuthTransport) -// assert.True(t, ok) - -// testRoundTripper := &testRoundTripper{ -// RoundTripFunc: func(req *http.Request) (*http.Response, error) { -// // Check if the ProxyAuthorizationHeader is correctly set -// assert.Equal(t, "Bearer test-token-http-client", req.Header.Get(ProxyAuthorizationHeader)) -// return &http.Response{StatusCode: http.StatusOK}, nil -// }, -// } -// pat.transport = testRoundTripper - -// req, _ := http.NewRequest("GET", "http://example.com", nil) -// _, err = httpClient.Do(req) -// assert.NoError(t, err) -// }) -// } +func TestNewProxyAuthInterceptor(t *testing.T) { + cfg := &Config{ + ProxyCommand: []string{"echo", "test-token"}, + } + + p := NewPerRPCCredentialsFuture() + + interceptor := NewProxyAuthInterceptor(cfg, p) + + ctx := context.Background() + method := "/test.method" + req := "request" + reply := "reply" + cc := new(grpc.ClientConn) + + errorInvoker := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + return errors.New("test error") + } + + // Call should return an error and trigger the interceptor to materialize proxy auth credentials + err := interceptor(ctx, method, req, reply, cc, errorInvoker) + assert.Error(t, err) + + // Check if proxyCredentialsFuture contains a proxy auth header token + creds, err := p.Get().GetRequestMetadata(ctx, "") + assert.True(t, p.IsInitialized()) + assert.NoError(t, err) + assert.Equal(t, "Bearer test-token", creds[ProxyAuthorizationHeader]) +} + +type testRoundTripper struct { + RoundTripFunc func(req *http.Request) (*http.Response, error) +} + +func (t *testRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return t.RoundTripFunc(req) +} + +func TestSetHTTPClientContext(t *testing.T) { + ctx := context.Background() + + t.Run("no proxy command and no proxy url", func(t *testing.T) { + cfg := &Config{} + + newCtx, err := setHTTPClientContext(ctx, cfg, nil) + assert.NoError(t, err) + + httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) + assert.True(t, ok) + + transport, ok := httpClient.Transport.(*http.Transport) + assert.True(t, ok) + assert.Nil(t, transport.Proxy) + }) + + t.Run("proxy url", func(t *testing.T) { + cfg := &Config{ + HTTPProxyURL: config. + URL{URL: url.URL{ + Scheme: "http", + Host: "localhost:8080", + }}, + } + newCtx, err := setHTTPClientContext(ctx, cfg, nil) + assert.NoError(t, err) + + httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) + assert.True(t, ok) + + transport, ok := httpClient.Transport.(*http.Transport) + assert.True(t, ok) + assert.NotNil(t, transport.Proxy) + }) + + t.Run("proxy command adds proxy-authorization header", func(t *testing.T) { + cfg := &Config{ + ProxyCommand: []string{"echo", "test-token-http-client"}, + } + + p := NewPerRPCCredentialsFuture() + MaterializeProxyAuthCredentials(ctx, cfg, p) + + newCtx, err := setHTTPClientContext(ctx, cfg, p) + assert.NoError(t, err) + + httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) + assert.True(t, ok) + + pat, ok := httpClient.Transport.(*proxyAuthTransport) + assert.True(t, ok) + + testRoundTripper := &testRoundTripper{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { + // Check if the ProxyAuthorizationHeader is correctly set + assert.Equal(t, "Bearer test-token-http-client", req.Header.Get(ProxyAuthorizationHeader)) + return &http.Response{StatusCode: http.StatusOK}, nil + }, + } + pat.transport = testRoundTripper + + req, _ := http.NewRequest("GET", "http://example.com", nil) + _, err = httpClient.Do(req) + assert.NoError(t, err) + }) +} From 66e6f659b983157db880313925ca627c789cd648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Fri, 22 Sep 2023 18:39:04 +0200 Subject: [PATCH 10/16] Fix number of opts in NewAdminConnection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/go/admin/client.go b/clients/go/admin/client.go index ac0d63cc8..69e109388 100644 --- a/clients/go/admin/client.go +++ b/clients/go/admin/client.go @@ -124,7 +124,7 @@ func NewAdminConnection(ctx context.Context, cfg *Config, proxyCredentialsFuture if opts == nil { // Initialize opts list to the potential number of options we will add. Initialization optimizes memory // allocation. - opts = make([]grpc.DialOption, 0, 6) + opts = make([]grpc.DialOption, 0, 7) } if cfg.UseInsecureConnection { From f8caf4f986cb6bc16527a4838b97befd9be0ecf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Fri, 22 Sep 2023 18:41:43 +0200 Subject: [PATCH 11/16] Don't overwrite original error in NewProxyAuthInterceptor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/go/admin/auth_interceptor.go b/clients/go/admin/auth_interceptor.go index aa40860ed..3630f5dbd 100644 --- a/clients/go/admin/auth_interceptor.go +++ b/clients/go/admin/auth_interceptor.go @@ -164,9 +164,9 @@ func NewProxyAuthInterceptor(cfg *Config, proxyCredentialsFuture *PerRPCCredenti err := invoker(ctx, method, req, reply, cc, opts...) if err != nil { - err := MaterializeProxyAuthCredentials(ctx, cfg, proxyCredentialsFuture) - if err != nil { - return fmt.Errorf("proxy authorization error! Original Error: %v", err) + newErr := MaterializeProxyAuthCredentials(ctx, cfg, proxyCredentialsFuture) + if newErr != nil { + return fmt.Errorf("proxy authorization error! Original Error: %v, Proxy Auth Error: %w", err, newErr) } return invoker(ctx, method, req, reply, cc, opts...) } From b947012b2a98e14ee5d0faa8d1ff0ad9c6c3ac80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Fri, 22 Sep 2023 18:43:42 +0200 Subject: [PATCH 12/16] Improve error message when failing to create http client for oauth MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/go/admin/auth_interceptor.go b/clients/go/admin/auth_interceptor.go index 3630f5dbd..e2f833ce7 100644 --- a/clients/go/admin/auth_interceptor.go +++ b/clients/go/admin/auth_interceptor.go @@ -135,7 +135,7 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFut return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ctx, err := setHTTPClientContext(ctx, cfg, proxyCredentialsFuture) if err != nil { - return err + return fmt.Errorf("Could not create http client for oauth! Original Error: %v", err) } err = invoker(ctx, method, req, reply, cc, opts...) if err != nil { From 19d069ef4e0fc8fae57bbc67869d667de0b2c334 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Fri, 22 Sep 2023 18:46:21 +0200 Subject: [PATCH 13/16] Actually don't return any error from setHTTPClientContext at all as before MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/clients/go/admin/auth_interceptor.go b/clients/go/admin/auth_interceptor.go index e2f833ce7..5bf002a49 100644 --- a/clients/go/admin/auth_interceptor.go +++ b/clients/go/admin/auth_interceptor.go @@ -100,7 +100,7 @@ func (c *proxyAuthTransport) RoundTrip(req *http.Request) (*http.Response, error } // Set up http client used in oauth2 -func setHTTPClientContext(ctx context.Context, cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture) (context.Context, error) { +func setHTTPClientContext(ctx context.Context, cfg *Config, proxyCredentialsFuture *PerRPCCredentialsFuture) context.Context { httpClient := &http.Client{} transport := &http.Transport{} @@ -118,7 +118,7 @@ func setHTTPClientContext(ctx context.Context, cfg *Config, proxyCredentialsFutu httpClient.Transport = transport } - return context.WithValue(ctx, oauth2.HTTPClient, httpClient), nil + return context.WithValue(ctx, oauth2.HTTPClient, httpClient) } // NewAuthInterceptor creates a new grpc.UnaryClientInterceptor that forwards the grpc call and inspects the error. @@ -133,11 +133,9 @@ func setHTTPClientContext(ctx context.Context, cfg *Config, proxyCredentialsFutu // be able to find and acquire a valid AccessToken to annotate the request with. func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFuture *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - ctx, err := setHTTPClientContext(ctx, cfg, proxyCredentialsFuture) - if err != nil { - return fmt.Errorf("Could not create http client for oauth! Original Error: %v", err) - } - err = invoker(ctx, method, req, reply, cc, opts...) + ctx = setHTTPClientContext(ctx, cfg, proxyCredentialsFuture) + + err := invoker(ctx, method, req, reply, cc, opts...) if err != nil { logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err) From 6a567755264ffa9649fa315eb6e5c16b1d76b636 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Fri, 22 Sep 2023 18:56:29 +0200 Subject: [PATCH 14/16] Don't require github.com/golang-jwt/jwt anymore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- go.mod | 1 - go.sum | 2 -- 2 files changed, 3 deletions(-) diff --git a/go.mod b/go.mod index 673b0c7f1..3aacab796 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/antihax/optional v1.0.0 github.com/flyteorg/flytestdlib v1.0.0 github.com/go-test/deep v1.0.7 - github.com/golang-jwt/jwt v3.2.2+incompatible github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/protobuf v1.4.3 github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 diff --git a/go.sum b/go.sum index d22db3d61..b2eab61f1 100644 --- a/go.sum +++ b/go.sum @@ -205,8 +205,6 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= -github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= From fd10310c40d31223078157446ab9d4c2cbd00378 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Fri, 22 Sep 2023 19:11:02 +0200 Subject: [PATCH 15/16] Make tests pass again MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor_test.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/clients/go/admin/auth_interceptor_test.go b/clients/go/admin/auth_interceptor_test.go index 7f43131be..139b54a34 100644 --- a/clients/go/admin/auth_interceptor_test.go +++ b/clients/go/admin/auth_interceptor_test.go @@ -337,8 +337,7 @@ func TestSetHTTPClientContext(t *testing.T) { t.Run("no proxy command and no proxy url", func(t *testing.T) { cfg := &Config{} - newCtx, err := setHTTPClientContext(ctx, cfg, nil) - assert.NoError(t, err) + newCtx := setHTTPClientContext(ctx, cfg, nil) httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) assert.True(t, ok) @@ -356,8 +355,7 @@ func TestSetHTTPClientContext(t *testing.T) { Host: "localhost:8080", }}, } - newCtx, err := setHTTPClientContext(ctx, cfg, nil) - assert.NoError(t, err) + newCtx := setHTTPClientContext(ctx, cfg, nil) httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) assert.True(t, ok) @@ -375,8 +373,7 @@ func TestSetHTTPClientContext(t *testing.T) { p := NewPerRPCCredentialsFuture() MaterializeProxyAuthCredentials(ctx, cfg, p) - newCtx, err := setHTTPClientContext(ctx, cfg, p) - assert.NoError(t, err) + newCtx := setHTTPClientContext(ctx, cfg, p) httpClient, ok := newCtx.Value(oauth2.HTTPClient).(*http.Client) assert.True(t, ok) @@ -394,7 +391,7 @@ func TestSetHTTPClientContext(t *testing.T) { pat.transport = testRoundTripper req, _ := http.NewRequest("GET", "http://example.com", nil) - _, err = httpClient.Do(req) + _, err := httpClient.Do(req) assert.NoError(t, err) }) } From 89ab23c2a13ae4ba84ea6ed3c4ffaab17dd970d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabio=20Gr=C3=A4tz?= Date: Fri, 22 Sep 2023 19:18:12 +0200 Subject: [PATCH 16/16] Lint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabio Grätz --- clients/go/admin/auth_interceptor_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/go/admin/auth_interceptor_test.go b/clients/go/admin/auth_interceptor_test.go index 139b54a34..bb304d4f6 100644 --- a/clients/go/admin/auth_interceptor_test.go +++ b/clients/go/admin/auth_interceptor_test.go @@ -371,7 +371,8 @@ func TestSetHTTPClientContext(t *testing.T) { } p := NewPerRPCCredentialsFuture() - MaterializeProxyAuthCredentials(ctx, cfg, p) + err := MaterializeProxyAuthCredentials(ctx, cfg, p) + assert.NoError(t, err) newCtx := setHTTPClientContext(ctx, cfg, p) @@ -391,7 +392,7 @@ func TestSetHTTPClientContext(t *testing.T) { pat.transport = testRoundTripper req, _ := http.NewRequest("GET", "http://example.com", nil) - _, err := httpClient.Do(req) + _, err = httpClient.Do(req) assert.NoError(t, err) }) }