From 254fac1c549a7930b32df10d010a9f5ff1a30ee6 Mon Sep 17 00:00:00 2001 From: Edbert Linardi Date: Wed, 19 Apr 2023 17:38:18 -0700 Subject: [PATCH] Add an option to enable metrics collection in sansshell-server (#219) * add metrics instrumentations * init metric during server initialization * change metrics structure to allow registration * Remove comments * add comments * fix npe * Handle metric error. Add proxy metrics * handle metric error * handle err * rm irrelevant files * add this one back * add comments * metric desc variables * change singleton to contextbased * remove unused file * Delete launch.json * Add comments Correct otel metrics function definition * update comment and change otelrecorder to interface * add comments * add test for metrics package * fix comment and remove debug statement. * naming consistency * add comment, and change beahvior of RegisterInt64Gauge to allow overwriting gauge * fix comment * fix comment typo and inconsistency * add metrics recorder to sansshell server * server metrics endpoint * fix metric name * disable otelgrpc metrics collection * add method as an attribute of authzDeniedPolicyCounter * add to other authz failure counters --- auth/opa/rpcauth/rpcauth.go | 7 +-- cmd/proxy-server/server/server.go | 14 +++--- cmd/sansshell-server/main.go | 22 ++++++++++ cmd/sansshell-server/server/server.go | 63 +++++++++++++++++++++++++-- 4 files changed, 95 insertions(+), 11 deletions(-) diff --git a/auth/opa/rpcauth/rpcauth.go b/auth/opa/rpcauth/rpcauth.go index 441e35e7..40671820 100644 --- a/auth/opa/rpcauth/rpcauth.go +++ b/auth/opa/rpcauth/rpcauth.go @@ -23,6 +23,7 @@ import ( "strings" "github.com/go-logr/logr" + "go.opentelemetry.io/otel/attribute" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -129,7 +130,7 @@ func (g *Authorizer) Eval(ctx context.Context, input *RPCAuthInput) error { if errRegister != nil { logger.V(1).Error(errRegister, "failed to register "+authzFailureEvalErrorCounterName) } - errCounter := recorder.AddInt64Counter(ctx, authzFailureEvalErrorCounterName, 1) + errCounter := recorder.AddInt64Counter(ctx, authzFailureEvalErrorCounterName, 1, attribute.String("method", input.Method)) if errCounter != nil { logger.V(1).Error(errCounter, "failed to add counter "+authzFailureEvalErrorCounterName) } @@ -144,7 +145,7 @@ func (g *Authorizer) Eval(ctx context.Context, input *RPCAuthInput) error { if errRegister != nil { logger.V(1).Error(errRegister, "failed to register "+authzDenialHintErrorCounterName) } - errCounter := recorder.AddInt64Counter(ctx, authzDenialHintErrorCounterName, 1) + errCounter := recorder.AddInt64Counter(ctx, authzDenialHintErrorCounterName, 1, attribute.String("method", input.Method)) if errCounter != nil { logger.V(1).Error(errCounter, "failed to add counter "+authzDenialHintErrorCounterName) } @@ -158,7 +159,7 @@ func (g *Authorizer) Eval(ctx context.Context, input *RPCAuthInput) error { if errRegister != nil { logger.V(1).Error(errRegister, "failed to register "+authzDeniedPolicyCounterName) } - errCounter := recorder.AddInt64Counter(ctx, authzDeniedPolicyCounterName, 1) + errCounter := recorder.AddInt64Counter(ctx, authzDeniedPolicyCounterName, 1, attribute.String("method", input.Method)) if errCounter != nil { logger.V(1).Error(errCounter, "failed to add counter "+authzDeniedPolicyCounterName) } diff --git a/cmd/proxy-server/server/server.go b/cmd/proxy-server/server/server.go index b171013c..ee7d80f2 100644 --- a/cmd/proxy-server/server/server.go +++ b/cmd/proxy-server/server/server.go @@ -34,6 +34,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + otelmetric "go.opentelemetry.io/otel/metric" "google.golang.org/grpc" "github.com/Snowflake-Labs/sansshell/auth/mtls" @@ -319,19 +320,22 @@ func WithMetricsPort(addr string) Option { // WithOtelTracing adds the OpenTelemetry gRPC interceptors to all servers and clients. // The interceptors collect and export tracing data for gRPC requests and responses -func WithOtelTracing(interceptorOpt otelgrpc.Option) Option { +func WithOtelTracing(interceptorOpts ...otelgrpc.Option) Option { return optionFunc(func(_ context.Context, r *runState) error { + interceptorOpts = append(interceptorOpts, + otelgrpc.WithMeterProvider(otelmetric.NewNoopMeterProvider()), // We don't want otel grpc metrics so discard them + ) r.unaryClientInterceptors = append(r.unaryClientInterceptors, - otelgrpc.UnaryClientInterceptor(interceptorOpt), + otelgrpc.UnaryClientInterceptor(interceptorOpts...), ) r.streamClientInterceptors = append(r.streamClientInterceptors, - otelgrpc.StreamClientInterceptor(interceptorOpt), + otelgrpc.StreamClientInterceptor(interceptorOpts...), ) r.unaryInterceptors = append(r.unaryInterceptors, - otelgrpc.UnaryServerInterceptor(interceptorOpt), + otelgrpc.UnaryServerInterceptor(interceptorOpts...), ) r.streamInterceptors = append(r.streamInterceptors, - otelgrpc.StreamServerInterceptor(interceptorOpt), + otelgrpc.StreamServerInterceptor(interceptorOpts...), ) return nil }) diff --git a/cmd/sansshell-server/main.go b/cmd/sansshell-server/main.go index dd5489e1..1bae9817 100644 --- a/cmd/sansshell-server/main.go +++ b/cmd/sansshell-server/main.go @@ -31,6 +31,9 @@ import ( "github.com/go-logr/logr" "github.com/go-logr/stdr" + prometheus_exporter "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/metric/global" + otelmetricsdk "go.opentelemetry.io/otel/sdk/metric" "google.golang.org/grpc" channelz "google.golang.org/grpc/channelz/service" "google.golang.org/grpc/reflection" @@ -46,6 +49,7 @@ import ( "github.com/Snowflake-Labs/sansshell/auth/opa/rpcauth" "github.com/Snowflake-Labs/sansshell/cmd/sansshell-server/server" "github.com/Snowflake-Labs/sansshell/cmd/util" + "github.com/Snowflake-Labs/sansshell/telemetry/metrics" // Import the server modules you want to expose, they automatically register @@ -75,6 +79,7 @@ var ( policyFile = flag.String("policy-file", "", "Path to a file with an OPA policy. If empty, uses --policy.") hostport = flag.String("hostport", "localhost:50042", "Where to listen for connections.") debugport = flag.String("debugport", "localhost:50044", "A separate port for http debug pages. Set to an empty string to disable.") + metricsport = flag.String("metricsport", "localhost:50047", "A separate port for http debug pages. Set to an empty string to disable.") credSource = flag.String("credential-source", mtlsFlags.Name(), fmt.Sprintf("Method used to obtain mTLS credentials (one of [%s])", strings.Join(mtls.Loaders(), ","))) verbosity = flag.Int("v", 0, "Verbosity level. > 0 indicates more extensive logging") validate = flag.Bool("validate", false, "If true will evaluate the policy and then exit (non-zero on error)") @@ -124,8 +129,23 @@ func main() { logger := stdr.New(log.New(os.Stderr, "", logOpts)).WithName("sanshell-server") stdr.SetVerbosity(*verbosity) + // Setup exporter using the default prometheus registry + exporter, err := prometheus_exporter.New() + if err != nil { + log.Fatalf("failed to create prometheus exporter: %v\n", err) + } + global.SetMeterProvider(otelmetricsdk.NewMeterProvider( + otelmetricsdk.WithReader(exporter), + )) + meter := global.Meter("sansshell-server") + recorder, err := metrics.NewOtelRecorder(meter, metrics.WithMetricNamePrefix("sansshell-server")) + if err != nil { + log.Fatalf("failed to create OtelRecorder: %v\n", err) + } + policy := util.ChoosePolicy(logger, defaultPolicy, *policyFlag, *policyFile) ctx := logr.NewContext(context.Background(), logger) + ctx = metrics.NewContextWithRecorder(ctx, recorder) parsed, err := opa.NewAuthzPolicy(ctx, policy, opa.WithDenialHintsQuery("data.sansshell.authz.denial_hints")) if err != nil { @@ -146,5 +166,7 @@ func main() { server.WithRawServerOption(func(s *grpc.Server) { reflection.Register(s) }), server.WithRawServerOption(func(s *grpc.Server) { channelz.RegisterChannelzServiceToServer(s) }), server.WithDebugPort(*debugport), + server.WithMetricsPort(*metricsport), + server.WithMetricsRecorder(recorder), ) } diff --git a/cmd/sansshell-server/server/server.go b/cmd/sansshell-server/server/server.go index 9ca2d738..e1f56cd4 100644 --- a/cmd/sansshell-server/server/server.go +++ b/cmd/sansshell-server/server/server.go @@ -28,14 +28,18 @@ import ( "os" "github.com/go-logr/logr" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + otelmetric "go.opentelemetry.io/otel/metric" "google.golang.org/grpc" "github.com/Snowflake-Labs/sansshell/auth/mtls" "github.com/Snowflake-Labs/sansshell/auth/opa" "github.com/Snowflake-Labs/sansshell/auth/opa/rpcauth" "github.com/Snowflake-Labs/sansshell/server" + "github.com/Snowflake-Labs/sansshell/telemetry/metrics" "google.golang.org/grpc/credentials" ) @@ -49,6 +53,9 @@ type runState struct { hostport string debugport string debughandler *http.ServeMux + metricsport string + metricshandler *http.ServeMux + metricsRecorder metrics.MetricsRecorder policy *opa.AuthzPolicy justification bool justificationFunc func(string) error @@ -210,15 +217,58 @@ func WithDebugPort(addr string) Option { }) } +// WithMetricsRecorder enables metric instrumentations by inserting grpc metric interceptors +// and attaching recorder to the server runstate +func WithMetricsRecorder(recorder metrics.MetricsRecorder) Option { + return optionFunc(func(ctx context.Context, r *runState) error { + r.metricsRecorder = recorder + // Instrument gRPC Server + grpcServerMetrics := grpc_prometheus.NewServerMetrics( + grpc_prometheus.WithServerHandlingTimeHistogram(), + ) + errRegister := prometheus.Register(grpcServerMetrics) + if errRegister != nil { + return fmt.Errorf("fail to register grpc server metrics: %s", errRegister) + } + r.unaryInterceptors = append(r.unaryInterceptors, + metrics.UnaryServerMetricsInterceptor(recorder), + grpcServerMetrics.UnaryServerInterceptor(), + ) + r.streamInterceptors = append(r.streamInterceptors, + metrics.StreamServerMetricsInterceptor(recorder), + grpcServerMetrics.StreamServerInterceptor(), + ) + return nil + }) +} + +// WithMetricsPort opens a HTTP endpoint for publishing metrics at the given addr +// and initializes metrics exporter. +// This endpoint is to be scraped by a Prometheus-style metrics scraper. +// It can be accessed at http://{addr}/metrics +func WithMetricsPort(addr string) Option { + return optionFunc(func(ctx context.Context, r *runState) error { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + r.metricsport = addr + r.metricshandler = mux + + return nil + }) +} + // WithOtelTracing adds the OpenTelemetry gRPC interceptors to both stream and unary servers // The interceptors collect and export tracing data for gRPC requests and responses -func WithOtelTracing(interceptorOpt otelgrpc.Option) Option { +func WithOtelTracing(interceptorOpts ...otelgrpc.Option) Option { return optionFunc(func(_ context.Context, r *runState) error { + interceptorOpts = append(interceptorOpts, + otelgrpc.WithMeterProvider(otelmetric.NewNoopMeterProvider()), // We don't want otel grpc metrics so discard them + ) r.unaryInterceptors = append(r.unaryInterceptors, - otelgrpc.UnaryServerInterceptor(interceptorOpt), + otelgrpc.UnaryServerInterceptor(interceptorOpts...), ) r.streamInterceptors = append(r.streamInterceptors, - otelgrpc.StreamServerInterceptor(interceptorOpt), + otelgrpc.StreamServerInterceptor(interceptorOpts...), ) return nil }) @@ -244,6 +294,13 @@ func Run(ctx context.Context, opts ...Option) { }() } + // Start metrics endpoint if both metrics port and handler are configured + if rs.metricshandler != nil && rs.metricsport != "" { + go func() { + rs.logger.Error(http.ListenAndServe(rs.metricsport, rs.metricshandler), "Metrics handler unexpectedly exited") + }() + } + creds, err := extractTransportCredentialsFromRunState(ctx, rs) if err != nil { rs.logger.Error(err, "unable to extract transport credentials from runstate", "credsource", rs.credSource)