Skip to content

Commit

Permalink
Fix grpc prometheus metrics (#4172)
Browse files Browse the repository at this point in the history
* fix grpc prom metrics

* fix test

---------

Co-authored-by: JamesMurkin <[email protected]>
  • Loading branch information
d80tb7 and JamesMurkin authored Jan 29, 2025
1 parent d43a9d9 commit a5afc8a
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 30 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
github.com/golang/protobuf v1.5.4
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/hashicorp/go-memdb v1.3.4
github.com/hashicorp/go-multierror v1.1.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,6 @@ github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 h1:qnpS
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1/go.mod h1:lXGCsh6c22WGtjr+qGHj1otzZpV/1kwTMAqkwZsnWRU=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.2.0 h1:kQ0NI7W1B3HwiN5gAYtY+XFItDPbLBwYRxAqbFTyDes=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.2.0/go.mod h1:zrT2dxOAjNFPRGjTUe2Xmb4q4YdUwVvQFV6xiCSf+z0=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
Expand Down
3 changes: 0 additions & 3 deletions internal/binoculars/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"os"
"sync"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"

"github.com/armadaproject/armada/internal/binoculars/configuration"
"github.com/armadaproject/armada/internal/binoculars/server"
"github.com/armadaproject/armada/internal/binoculars/service"
Expand Down Expand Up @@ -48,7 +46,6 @@ func StartUp(config *configuration.BinocularsConfig) (func(), *sync.WaitGroup) {
cordonService := service.NewKubernetesCordonService(config.Cordon, permissionsChecker, kubernetesClientProvider)
binocularsServer := server.NewBinocularsServer(logService, cordonService)
binoculars.RegisterBinocularsServer(grpcServer, binocularsServer)
grpc_prometheus.Register(grpcServer)

grpcCommon.Listen(config.GrpcPort, grpcServer, wg)

Expand Down
5 changes: 2 additions & 3 deletions internal/common/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"

"github.com/prometheus/client_golang/prometheus"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -76,8 +76,7 @@ func setupPromMetrics() *grpc_prometheus.ServerMetrics {
grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
),
)
reg := prometheus.NewRegistry()
reg.MustRegister(srvMetrics)
prometheus.MustRegister(srvMetrics)
return srvMetrics
}

Expand Down
11 changes: 7 additions & 4 deletions internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"github.com/go-playground/validator/v10"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -284,12 +284,15 @@ func setupExecutorApiComponents(
}

func createConnectionToApi(connectionDetails client.ApiConnectionDetails, maxMessageSizeBytes int, grpcConfig keepalive.ClientParameters) (*grpc.ClientConn, error) {
grpc_prometheus.EnableClientHandlingTimeHistogram()
clientMetrics := grpc_prometheus.NewClientMetrics(
grpc_prometheus.WithClientHandlingTimeHistogram(),
)
prometheus.MustRegister(clientMetrics)
return client.CreateApiConnectionWithCallOptions(
&connectionDetails,
[]grpc.CallOption{grpc.MaxCallRecvMsgSize(maxMessageSizeBytes)},
grpc.WithChainUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithChainStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithChainUnaryInterceptor(clientMetrics.UnaryClientInterceptor()),
grpc.WithChainStreamInterceptor(clientMetrics.StreamClientInterceptor()),
grpc.WithKeepaliveParams(grpcConfig),
)
}
Expand Down
15 changes: 8 additions & 7 deletions internal/scheduler/leader/leader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"
"sync"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/pkg/errors"
"google.golang.org/grpc"

Expand All @@ -24,14 +24,16 @@ type LeaderConnectionProvider struct {
leaderConfig configuration.LeaderConfig
connectionLock sync.Mutex
connectionByName map[string]*grpc.ClientConn
metrics *grpc_prometheus.ClientMetrics
}

func NewLeaderConnectionProvider(leaderController LeaderController, leaderConfig configuration.LeaderConfig) *LeaderConnectionProvider {
func NewLeaderConnectionProvider(leaderController LeaderController, leaderConfig configuration.LeaderConfig, metrics *grpc_prometheus.ClientMetrics) *LeaderConnectionProvider {
return &LeaderConnectionProvider{
leaderController: leaderController,
leaderConfig: leaderConfig,
connectionLock: sync.Mutex{},
connectionByName: map[string]*grpc.ClientConn{},
metrics: metrics,
}
}

Expand Down Expand Up @@ -60,7 +62,7 @@ func (l *LeaderConnectionProvider) getClientByName(currentLeaderName string) (*g
leaderConnectionDetails := l.leaderConfig.LeaderConnection
leaderConnectionDetails.ArmadaUrl = strings.ReplaceAll(leaderConnectionDetails.ArmadaUrl, leaseHolderNameToken, currentLeaderName)

apiConnection, err := createApiConnection(leaderConnectionDetails)
apiConnection, err := createApiConnection(leaderConnectionDetails, l.metrics)
if err != nil {
return nil, errors.Wrapf(err, "error creating connection to leader")
}
Expand All @@ -69,12 +71,11 @@ func (l *LeaderConnectionProvider) getClientByName(currentLeaderName string) (*g
return apiConnection, nil
}

func createApiConnection(connectionDetails client.ApiConnectionDetails) (*grpc.ClientConn, error) {
grpc_prometheus.EnableClientHandlingTimeHistogram()
func createApiConnection(connectionDetails client.ApiConnectionDetails, clientMetrics *grpc_prometheus.ClientMetrics) (*grpc.ClientConn, error) {
return client.CreateApiConnectionWithCallOptions(
&connectionDetails,
[]grpc.CallOption{},
grpc.WithChainUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithChainStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithChainUnaryInterceptor(clientMetrics.UnaryClientInterceptor()),
grpc.WithChainStreamInterceptor(clientMetrics.StreamClientInterceptor()),
)
}
9 changes: 5 additions & 4 deletions internal/scheduler/leader/leader_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package leader
import (
"testing"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/stretchr/testify/assert"

"github.com/armadaproject/armada/internal/common/armadacontext"
Expand All @@ -28,7 +29,7 @@ var templatedLeader = configuration.LeaderConfig{

func TestGetCurrentLeaderClientConnection(t *testing.T) {
leaderController := &FakeLeaderController{}
clientProvider := NewLeaderConnectionProvider(leaderController, defaultLeaderConfig)
clientProvider := NewLeaderConnectionProvider(leaderController, defaultLeaderConfig, &grpc_prometheus.ClientMetrics{})
leaderController.LeaderName = "new-leader"

isCurrentProcessLeader, result, err := clientProvider.GetCurrentLeaderClientConnection()
Expand All @@ -40,7 +41,7 @@ func TestGetCurrentLeaderClientConnection(t *testing.T) {

func TestGetCurrentLeaderClientConnection_WithTemplatedConnection(t *testing.T) {
leaderController := &FakeLeaderController{}
clientProvider := NewLeaderConnectionProvider(leaderController, templatedLeader)
clientProvider := NewLeaderConnectionProvider(leaderController, templatedLeader, &grpc_prometheus.ClientMetrics{})

leaderController.LeaderName = "new-leader"
isCurrentProcessLeader, result, err := clientProvider.GetCurrentLeaderClientConnection()
Expand All @@ -59,7 +60,7 @@ func TestGetCurrentLeaderClientConnection_WithTemplatedConnection(t *testing.T)

func TestGetCurrentLeaderClientConnection_NoLeader(t *testing.T) {
leaderController := &FakeLeaderController{}
clientProvider := NewLeaderConnectionProvider(leaderController, defaultLeaderConfig)
clientProvider := NewLeaderConnectionProvider(leaderController, defaultLeaderConfig, &grpc_prometheus.ClientMetrics{})

isCurrentProcessLeader, result, err := clientProvider.GetCurrentLeaderClientConnection()
assert.Nil(t, result)
Expand All @@ -69,7 +70,7 @@ func TestGetCurrentLeaderClientConnection_NoLeader(t *testing.T) {

func TestGetCurrentLeaderClientConnection_OnCurrentProcessIsLeader(t *testing.T) {
leaderController := &FakeLeaderController{}
clientProvider := NewLeaderConnectionProvider(leaderController, defaultLeaderConfig)
clientProvider := NewLeaderConnectionProvider(leaderController, defaultLeaderConfig, &grpc_prometheus.ClientMetrics{})
leaderController.IsCurrentlyLeader = true

isCurrentProcessLeader, result, err := clientProvider.GetCurrentLeaderClientConnection()
Expand Down
8 changes: 7 additions & 1 deletion internal/scheduler/schedulerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/apache/pulsar-client-go/pulsar"
"github.com/google/uuid"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -224,7 +225,12 @@ func Run(config schedulerconfig.Configuration) error {
schedulingContextRepository := reports.NewSchedulingContextRepository()
reportServer := reports.NewServer(schedulingContextRepository)

leaderClientConnectionProvider := leader.NewLeaderConnectionProvider(leaderController, config.Leader)
clientMetrics := grpc_prometheus.NewClientMetrics(
grpc_prometheus.WithClientHandlingTimeHistogram(),
)
prometheus.MustRegister(clientMetrics)

leaderClientConnectionProvider := leader.NewLeaderConnectionProvider(leaderController, config.Leader, clientMetrics)
schedulingSchedulerReportingServer := reports.NewLeaderProxyingSchedulingReportsServer(reportServer, leaderClientConnectionProvider)
schedulerobjects.RegisterSchedulerReportingServer(grpcServer, schedulingSchedulerReportingServer)

Expand Down
12 changes: 7 additions & 5 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/apache/pulsar-client-go/pulsar"
"github.com/google/uuid"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/extra/redisprometheus/v9"
Expand Down Expand Up @@ -194,7 +194,6 @@ func Serve(ctx *armadacontext.Context, config *configuration.ArmadaConfig, healt
api.RegisterExecutorServer(grpcServer, executorServer)

schedulerobjects.RegisterSchedulerReportingServer(grpcServer, schedulingReportsServer)
grpc_prometheus.Register(grpcServer)

// Cancel the errgroup if grpcServer.Serve returns an error.
log.Infof("Armada gRPC server listening on %d", config.GrpcPort)
Expand Down Expand Up @@ -235,11 +234,14 @@ func validateSubmissionConfig(config configuration.SubmissionConfig) error {
}

func createApiConnection(connectionDetails client.ApiConnectionDetails) (*grpc.ClientConn, error) {
grpc_prometheus.EnableClientHandlingTimeHistogram()
clientMetrics := grpc_prometheus.NewClientMetrics(
grpc_prometheus.WithClientHandlingTimeHistogram(),
)
prometheus.MustRegister(clientMetrics)
return client.CreateApiConnectionWithCallOptions(
&connectionDetails,
[]grpc.CallOption{},
grpc.WithChainUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithChainStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithChainUnaryInterceptor(clientMetrics.UnaryClientInterceptor()),
grpc.WithChainStreamInterceptor(clientMetrics.StreamClientInterceptor()),
)
}

0 comments on commit a5afc8a

Please sign in to comment.