Skip to content

Commit

Permalink
address comments.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Sep 5, 2024
1 parent e262f36 commit d0735fa
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 144 deletions.
2 changes: 1 addition & 1 deletion cmd/maestro/environments/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (e *Env) LoadClients() error {
glog.Infof("Using Mock GRPC Authorizer")
e.Clients.GRPCAuthorizer = grpcauthorizer.NewMockGRPCAuthorizer()
} else {
kubeConfig, err := clientcmd.BuildConfigFromFlags("", e.Config.GRPCServer.GRPCAuthrizerConfig)
kubeConfig, err := clientcmd.BuildConfigFromFlags("", e.Config.GRPCServer.GRPCAuthorizerConfig)
if err != nil {
glog.Warningf("Unable to create kube client config: %s", err.Error())
// fallback to in-cluster config
Expand Down
1 change: 0 additions & 1 deletion cmd/maestro/server/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func NewAPIServer(eventBroadcaster *event.EventBroadcaster) Server {
Handler: mainHandler,
}

// TODO: support authn and authz for gRPC
if env().Config.GRPCServer.EnableGRPCServer {
s.grpcServer = NewGRPCServer(env().Services.Resources(), eventBroadcaster, *env().Config.GRPCServer, env().Clients.GRPCAuthorizer)
}
Expand Down
169 changes: 169 additions & 0 deletions cmd/maestro/server/grpc_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package server

import (
"context"
"fmt"
"strings"

"github.com/golang/glog"
"github.com/openshift-online/maestro/pkg/client/grpcauthorizer"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)

// Context key type defined to avoid collisions in other pkgs using context
// See https://golang.org/pkg/context/#WithValue
type contextKey string

const (
contextUserKey contextKey = "user"
contextGroupsKey contextKey = "groups"
)

func newContextWithIdentity(ctx context.Context, user string, groups []string) context.Context {
ctx = context.WithValue(ctx, contextUserKey, user)
return context.WithValue(ctx, contextGroupsKey, groups)
}

// identityFromCertificate retrieves the user and groups from the client certificate if they are present.
func identityFromCertificate(ctx context.Context) (string, []string, error) {
p, ok := peer.FromContext(ctx)
if !ok {
return "", nil, status.Error(codes.Unauthenticated, "no peer found")
}

tlsAuth, ok := p.AuthInfo.(credentials.TLSInfo)
if !ok {
return "", nil, status.Error(codes.Unauthenticated, "unexpected peer transport credentials")
}

if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 {
return "", nil, status.Error(codes.Unauthenticated, "could not verify peer certificate")
}

if tlsAuth.State.VerifiedChains[0][0] == nil {
return "", nil, status.Error(codes.Unauthenticated, "could not verify peer certificate")
}

user := tlsAuth.State.VerifiedChains[0][0].Subject.CommonName
groups := tlsAuth.State.VerifiedChains[0][0].Subject.Organization

if user == "" {
return "", nil, status.Error(codes.Unauthenticated, "could not find user in peer certificate")
}

if len(groups) == 0 {
return "", nil, status.Error(codes.Unauthenticated, "could not find group in peer certificate")
}

return user, groups, nil
}

// identityFromToken retrieves the user and groups from the access token if they are present.
func identityFromToken(ctx context.Context, grpcAuthorizer grpcauthorizer.GRPCAuthorizer) (string, []string, error) {
// Extract the metadata from the context
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", nil, status.Error(codes.InvalidArgument, "missing metadata")
}

// Extract the access token from the metadata
authorization, ok := md["authorization"]
if !ok || len(authorization) == 0 {
return "", nil, status.Error(codes.Unauthenticated, "invalid token")
}

token := strings.TrimPrefix(authorization[0], "Bearer ")
// Extract the user and groups from the access token
return grpcAuthorizer.TokenReview(ctx, token)
}

// newAuthUnaryInterceptor creates a new unary interceptor that retrieves the user and groups from
// the access token in the incoming RPC context. If retrieval from the token fails, it falls back to
// retrieving the user and groups from the client certificate.
// It then creates a new context with the retrieved user and groups before invoking the provided handler.
func newAuthUnaryInterceptor(authNType string, authorizer grpcauthorizer.GRPCAuthorizer) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
var user string
var groups []string
var err error
switch authNType {
case "token":
user, groups, err = identityFromToken(ctx, authorizer)
if err != nil {
glog.Errorf("unable to get user and groups from token: %v", err)
return nil, err
}
case "mtls":
user, groups, err = identityFromCertificate(ctx)
if err != nil {
glog.Errorf("unable to get user and groups from certificate: %v", err)
return nil, err
}
default:
return nil, fmt.Errorf("unsupported authentication type %s", authNType)
}

// call the handler with the new context containing the user and groups
return handler(newContextWithIdentity(ctx, user, groups), req)
}
}

// wrappedStream wraps a grpc.ServerStream associated with an incoming RPC, and
// a custom context containing the user and groups derived from the client certificate
// specified in the incoming RPC metadata
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
return w.ctx
}

func newWrappedStream(ctx context.Context, s grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{s, ctx}
}

// newAuthStreamInterceptor creates a new stream interceptor that looks up the client certificate from the incoming RPC context,
// retrieves the user and groups from it and creates a new context with the user and groups before invoking the provided handler.
// otherwise, it falls back retrieving the user and groups from the access token.
func newAuthStreamInterceptor(authNType string, authorizer grpcauthorizer.GRPCAuthorizer) grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
var user string
var groups []string
var err error
switch authNType {
case "token":
user, groups, err = identityFromToken(ss.Context(), authorizer)
if err != nil {
glog.Errorf("unable to get user and groups from token: %v", err)
return err
}
case "mtls":
user, groups, err = identityFromCertificate(ss.Context())
if err != nil {
glog.Errorf("unable to get user and groups from certificate: %v", err)
return err
}
default:
return fmt.Errorf("unsupported authentication Type %s", authNType)
}

return handler(srv, newWrappedStream(newContextWithIdentity(ss.Context(), user, groups), ss))
}
}
144 changes: 6 additions & 138 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"net"
"os"
"strings"
"time"

ce "github.com/cloudevents/sdk-go/v2"
Expand All @@ -16,12 +15,8 @@ import (
"github.com/golang/glog"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"k8s.io/klog/v2"
pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1"
Expand All @@ -40,137 +35,6 @@ import (
"github.com/openshift-online/maestro/pkg/services"
)

// Context key type defined to avoid collisions in other pkgs using context
// See https://golang.org/pkg/context/#WithValue
type contextKey string

const (
contextUserKey contextKey = "user"
contextGroupsKey contextKey = "groups"
)

func newContextWithIdentity(ctx context.Context, user string, groups []string) context.Context {
ctx = context.WithValue(ctx, contextUserKey, user)
return context.WithValue(ctx, contextGroupsKey, groups)
}

// identityFromCertificate retrieves the user and groups from the client certificate if they are present.
func identityFromCertificate(ctx context.Context) (string, []string, error) {
p, ok := peer.FromContext(ctx)
if !ok {
return "", nil, status.Error(codes.Unauthenticated, "no peer found")
}

tlsAuth, ok := p.AuthInfo.(credentials.TLSInfo)
if !ok {
return "", nil, status.Error(codes.Unauthenticated, "unexpected peer transport credentials")
}

if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 {
return "", nil, status.Error(codes.Unauthenticated, "could not verify peer certificate")
}

if tlsAuth.State.VerifiedChains[0][0] == nil {
return "", nil, status.Error(codes.Unauthenticated, "could not verify peer certificate")
}

user := tlsAuth.State.VerifiedChains[0][0].Subject.CommonName
groups := tlsAuth.State.VerifiedChains[0][0].Subject.Organization

if user == "" {
return "", nil, status.Error(codes.Unauthenticated, "could not find user in peer certificate")
}

if len(groups) == 0 {
return "", nil, status.Error(codes.Unauthenticated, "could not find group in peer certificate")
}

return user, groups, nil
}

// identityFromToken retrieves the user and groups from the access token if they are present.
func identityFromToken(ctx context.Context, grpcAuthorizer grpcauthorizer.GRPCAuthorizer) (string, []string, error) {
// Extract the metadata from the context
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", nil, status.Error(codes.InvalidArgument, "missing metadata")
}

// Extract the access token from the metadata
authorization, ok := md["authorization"]
if !ok || len(authorization) == 0 {
return "", nil, status.Error(codes.Unauthenticated, "invalid token")
}

token := strings.TrimPrefix(authorization[0], "Bearer ")
// Extract the user and groups from the access token
return grpcAuthorizer.TokenReview(ctx, token)
}

// newAuthUnaryInterceptor creates a new unary interceptor that looks up the client certificate from the incoming RPC context,
// retrieves the user and groups from it and creates a new context with the user and groups before invoking the provided handler.
// otherwise, it falls back retrieving the user and groups from the access token.
func newAuthUnaryInterceptor(authorizer grpcauthorizer.GRPCAuthorizer) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
user, groups, err := identityFromToken(ctx, authorizer)
if err != nil {
glog.Warningf("unable to get user and groups from token: %v, fall back to certificate", err)
user, groups, err = identityFromCertificate(ctx)
if err != nil {
glog.Errorf("unable to get user and groups from certificate: %v", err)
return nil, err
}
}

return handler(newContextWithIdentity(ctx, user, groups), req)
}
}

// wrappedStream wraps a grpc.ServerStream associated with an incoming RPC, and
// a custom context containing the user and groups derived from the client certificate
// specified in the incoming RPC metadata
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
return w.ctx
}

func newWrappedStream(ctx context.Context, s grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{s, ctx}
}

// newAuthStreamInterceptor creates a new stream interceptor that looks up the client certificate from the incoming RPC context,
// retrieves the user and groups from it and creates a new context with the user and groups before invoking the provided handler.
// otherwise, it falls back retrieving the user and groups from the access token.
func newAuthStreamInterceptor(authorizer grpcauthorizer.GRPCAuthorizer) grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
user, groups, err := identityFromToken(ss.Context(), authorizer)
if err != nil {
glog.Warningf("unable to get user and groups from token: %v, fall back to certificate", err)
user, groups, err = identityFromCertificate(ss.Context())
if err != nil {
glog.Errorf("unable to get user and groups from certificate: %v", err)
return err
}
}

return handler(srv, newWrappedStream(newContextWithIdentity(ss.Context(), user, groups), ss))
}
}

// GRPCServer includes a gRPC server and a resource service
type GRPCServer struct {
pbv1.UnimplementedCloudEventServiceServer
Expand Down Expand Up @@ -238,10 +102,14 @@ func NewGRPCServer(resourceService services.ResourceService, eventBroadcaster *e
tlsConfig.ClientCAs = certPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert

grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)), grpc.UnaryInterceptor(newAuthUnaryInterceptor(grpcAuthorizer)), grpc.StreamInterceptor(newAuthStreamInterceptor(grpcAuthorizer)))
grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.UnaryInterceptor(newAuthUnaryInterceptor(config.GRPCAuthNType, grpcAuthorizer)),
grpc.StreamInterceptor(newAuthStreamInterceptor(config.GRPCAuthNType, grpcAuthorizer)))
glog.Infof("Serving gRPC service with mTLS at %s", config.ServerBindPort)
} else {
grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)), grpc.UnaryInterceptor(newAuthUnaryInterceptor(grpcAuthorizer)), grpc.StreamInterceptor(newAuthStreamInterceptor(grpcAuthorizer)))
grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.UnaryInterceptor(newAuthUnaryInterceptor(config.GRPCAuthNType, grpcAuthorizer)),
grpc.StreamInterceptor(newAuthStreamInterceptor(config.GRPCAuthNType, grpcAuthorizer)))
glog.Infof("Serving gRPC service with TLS at %s", config.ServerBindPort)
}
} else {
Expand Down
2 changes: 0 additions & 2 deletions pkg/client/grpcauthorizer/kube_authorizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ func (k *KubeGRPCAuthorizer) AccessReview(ctx context.Context, action, resourceT
switch resourceType {
case "source":
nonResourceUrl = fmt.Sprintf("/sources/%s", resource)
case "cluster":
nonResourceUrl = fmt.Sprintf("/clusters/%s", resource)
default:
return false, fmt.Errorf("unsupported resource type: %s", resourceType)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type GRPCServerConfig struct {
TLSCertFile string `json:"grpc_tls_cert_file"`
TLSKeyFile string `json:"grpc_tls_key_file"`
GRPCAuthNType string `json:"grpc_authn_type"`
GRPCAuthrizerConfig string `json:"grpc_authorizer_config"`
GRPCAuthorizerConfig string `json:"grpc_authorizer_config"`
ClientCAFile string `json:"grpc_client_ca_file"`
ServerBindPort string `json:"server_bind_port"`
BrokerBindPort string `json:"broker_bind_port"`
Expand Down Expand Up @@ -45,6 +45,6 @@ func (s *GRPCServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.TLSCertFile, "grpc-tls-cert-file", "", "The path to the tls.crt file")
fs.StringVar(&s.TLSKeyFile, "grpc-tls-key-file", "", "The path to the tls.key file")
fs.StringVar(&s.GRPCAuthNType, "grpc-authn-type", "mock", "Specify the gRPC authentication type (e.g., mock, mtls or token)")
fs.StringVar(&s.GRPCAuthrizerConfig, "grpc-authorizer-config", "", "Path to the gRPC authorizer configuration file")
fs.StringVar(&s.GRPCAuthorizerConfig, "grpc-authorizer-config", "", "Path to the gRPC authorizer configuration file")
fs.StringVar(&s.ClientCAFile, "grpc-client-ca-file", "", "The path to the client ca file, must specify if using mtls authentication type")
}

0 comments on commit d0735fa

Please sign in to comment.