From af218815a29f377f9b1c604b4c0a52036c4e4170 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Tue, 12 Nov 2024 10:54:47 +0000 Subject: [PATCH] Enable client side gRPC health check by default Signed-off-by: Benjamin Wang --- client/v3/internal/resolver/resolver.go | 2 +- server/etcdserver/api/v3rpc/interceptor.go | 12 ++++-- tests/e2e/failover_test.go | 43 +--------------------- 3 files changed, 12 insertions(+), 45 deletions(-) diff --git a/client/v3/internal/resolver/resolver.go b/client/v3/internal/resolver/resolver.go index b5c9de00786..e83bde39837 100644 --- a/client/v3/internal/resolver/resolver.go +++ b/client/v3/internal/resolver/resolver.go @@ -41,7 +41,7 @@ func New(endpoints ...string) *EtcdManualResolver { // Build returns itself for Resolver, because it's both a builder and a resolver. func (r *EtcdManualResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - r.serviceConfig = cc.ParseServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) + r.serviceConfig = cc.ParseServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`) if r.serviceConfig.Err != nil { return nil, r.serviceConfig.Err } diff --git a/server/etcdserver/api/v3rpc/interceptor.go b/server/etcdserver/api/v3rpc/interceptor.go index 65e068ebbfc..972af8c3929 100644 --- a/server/etcdserver/api/v3rpc/interceptor.go +++ b/server/etcdserver/api/v3rpc/interceptor.go @@ -34,8 +34,9 @@ import ( ) const ( - maxNoLeaderCnt = 3 - snapshotMethod = "/etcdserverpb.Maintenance/Snapshot" + maxNoLeaderCnt = 3 + snapshotMethod = "/etcdserverpb.Maintenance/Snapshot" + gRPCHealthWatch = "/grpc.health.v1.Health/Watch" ) type streamsMap struct { @@ -218,7 +219,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor return rpctypes.ErrGRPCNotCapable } - if s.IsMemberExist(s.MemberID()) && s.IsLearner() && info.FullMethod != snapshotMethod { // learner does not support stream RPC except Snapshot + if s.IsMemberExist(s.MemberID()) && s.IsLearner() && !isRPCStreamSupportForLearner(info) { // learner does not support stream RPC except Snapshot return rpctypes.ErrGRPCNotSupportedForLearner } @@ -259,6 +260,11 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor } } +// learner does not support stream RPC except Snapshot and gRPC Health Watch +func isRPCStreamSupportForLearner(info *grpc.StreamServerInfo) bool { + return info.FullMethod == snapshotMethod || info.FullMethod == gRPCHealthWatch +} + // cancellableContext wraps a context with new cancellable context that allows a // specific cancellation error to be preserved and later retrieved using the // Context.Err() function. This is so downstream context users can disambiguate diff --git a/tests/e2e/failover_test.go b/tests/e2e/failover_test.go index 87860367348..837c6d31355 100644 --- a/tests/e2e/failover_test.go +++ b/tests/e2e/failover_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" - "google.golang.org/grpc" _ "google.golang.org/grpc/health" clientv3 "go.etcd.io/etcd/client/v3" @@ -44,9 +43,8 @@ const ( func TestFailoverOnDefrag(t *testing.T) { tcs := []struct { - name string - clusterOptions []e2e.EPClusterOption - gRPCDialOptions []grpc.DialOption + name string + clusterOptions []e2e.EPClusterOption // common assertion expectedMinQPS float64 @@ -62,10 +60,6 @@ func TestFailoverOnDefrag(t *testing.T) { e2e.WithExperimentalStopGRPCServiceOnDefrag(true), e2e.WithGoFailEnabled(true), }, - gRPCDialOptions: []grpc.DialOption{ - grpc.WithDisableServiceConfig(), - grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), - }, expectedMinQPS: 20, expectedMaxFailureRate: 0.01, }, @@ -76,20 +70,6 @@ func TestFailoverOnDefrag(t *testing.T) { e2e.WithExperimentalStopGRPCServiceOnDefrag(false), e2e.WithGoFailEnabled(true), }, - gRPCDialOptions: []grpc.DialOption{ - grpc.WithDisableServiceConfig(), - grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), - }, - expectedMinQPS: 20, - expectedMinFailureRate: 0.25, - }, - { - name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled", - clusterOptions: []e2e.EPClusterOption{ - e2e.WithClusterSize(3), - e2e.WithExperimentalStopGRPCServiceOnDefrag(true), - e2e.WithGoFailEnabled(true), - }, expectedMinQPS: 20, expectedMinFailureRate: 0.25, }, @@ -100,10 +80,6 @@ func TestFailoverOnDefrag(t *testing.T) { e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", true), e2e.WithGoFailEnabled(true), }, - gRPCDialOptions: []grpc.DialOption{ - grpc.WithDisableServiceConfig(), - grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), - }, expectedMinQPS: 20, expectedMaxFailureRate: 0.01, }, @@ -114,20 +90,6 @@ func TestFailoverOnDefrag(t *testing.T) { e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", false), e2e.WithGoFailEnabled(true), }, - gRPCDialOptions: []grpc.DialOption{ - grpc.WithDisableServiceConfig(), - grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), - }, - expectedMinQPS: 20, - expectedMinFailureRate: 0.25, - }, - { - name: "defrag blocks one-third of requests with StopGRPCServiceOnDefrag feature gate set to true and client health check disabled", - clusterOptions: []e2e.EPClusterOption{ - e2e.WithClusterSize(3), - e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", true), - e2e.WithGoFailEnabled(true), - }, expectedMinQPS: 20, expectedMinFailureRate: 0.25, }, @@ -151,7 +113,6 @@ func TestFailoverOnDefrag(t *testing.T) { DialKeepAliveTime: keepaliveTime, DialKeepAliveTimeout: keepaliveTimeout, Endpoints: endpoints, - DialOptions: tc.gRPCDialOptions, }) if cerr != nil { return cerr