diff --git a/manager/manager.go b/manager/manager.go index a67bf8a7db1..4445e9618b6 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -33,7 +33,6 @@ import ( "d7y.io/dragonfly/v2/manager/service" "d7y.io/dragonfly/v2/pkg/rpc" grpc_manager_server "d7y.io/dragonfly/v2/pkg/rpc/manager/server" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" ) @@ -104,11 +103,11 @@ func New(cfg *config.Config) (*Server, error) { // Initialize GRPC server grpcService := service.NewGRPC(db, cache, searcher) - var opts []grpc.ServerOption + var enableJaeger bool if cfg.Options.Telemetry.Jaeger != "" { - opts = append(opts, grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor())) + enableJaeger = true } - grpcServer := grpc_manager_server.New(grpcService, opts...) + grpcServer := grpc_manager_server.New(grpcService, enableJaeger) s.grpcServer = grpcServer // Initialize prometheus diff --git a/pkg/rpc/manager/server/server.go b/pkg/rpc/manager/server/server.go index 44ea2e89964..c4c97239072 100644 --- a/pkg/rpc/manager/server/server.go +++ b/pkg/rpc/manager/server/server.go @@ -27,22 +27,22 @@ import ( grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" ) -var defaultServerOptions = []grpc.ServerOption{ - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - grpc_validator.StreamServerInterceptor(), - grpc_recovery.StreamServerInterceptor(), - grpc_prometheus.StreamServerInterceptor, - grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), - )), - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - grpc_validator.UnaryServerInterceptor(), - grpc_recovery.UnaryServerInterceptor(), - grpc_prometheus.UnaryServerInterceptor, - grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), - )), +var defaultStreamMiddleWares = []grpc.StreamServerInterceptor{ + grpc_validator.StreamServerInterceptor(), + grpc_recovery.StreamServerInterceptor(), + grpc_prometheus.StreamServerInterceptor, + grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), +} + +var defaultUnaryMiddleWares = []grpc.UnaryServerInterceptor{ + grpc_validator.UnaryServerInterceptor(), + grpc_recovery.UnaryServerInterceptor(), + grpc_prometheus.UnaryServerInterceptor, + grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), } // ManagerServer is the server API for Manager service. @@ -66,8 +66,17 @@ type proxy struct { manager.UnimplementedManagerServer } -func New(managerServer ManagerServer, opts ...grpc.ServerOption) *grpc.Server { - grpcServer := grpc.NewServer(append(defaultServerOptions, opts...)...) +func New(managerServer ManagerServer, jaeger bool, opts ...grpc.ServerOption) *grpc.Server { + if jaeger { + defaultStreamMiddleWares = append(defaultStreamMiddleWares, otelgrpc.StreamServerInterceptor()) + defaultUnaryMiddleWares = append(defaultUnaryMiddleWares, otelgrpc.UnaryServerInterceptor()) + } + + grpcServer := grpc.NewServer(append([]grpc.ServerOption{ + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(defaultStreamMiddleWares...)), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(defaultUnaryMiddleWares...)), + }, opts...)...) + manager.RegisterManagerServer(grpcServer, &proxy{server: managerServer}) return grpcServer }