Skip to content

Commit

Permalink
Enable client side gRPC health check by default
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Nov 12, 2024
1 parent 7ab7612 commit af21881
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 45 deletions.
2 changes: 1 addition & 1 deletion client/v3/internal/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func New(endpoints ...string) *EtcdManualResolver {

// Build returns itself for Resolver, because it's both a builder and a resolver.
func (r *EtcdManualResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r.serviceConfig = cc.ParseServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
r.serviceConfig = cc.ParseServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`)
if r.serviceConfig.Err != nil {
return nil, r.serviceConfig.Err
}
Expand Down
12 changes: 9 additions & 3 deletions server/etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ import (
)

const (
maxNoLeaderCnt = 3
snapshotMethod = "/etcdserverpb.Maintenance/Snapshot"
maxNoLeaderCnt = 3
snapshotMethod = "/etcdserverpb.Maintenance/Snapshot"
gRPCHealthWatch = "/grpc.health.v1.Health/Watch"
)

type streamsMap struct {
Expand Down Expand Up @@ -218,7 +219,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
return rpctypes.ErrGRPCNotCapable
}

if s.IsMemberExist(s.MemberID()) && s.IsLearner() && info.FullMethod != snapshotMethod { // learner does not support stream RPC except Snapshot
if s.IsMemberExist(s.MemberID()) && s.IsLearner() && !isRPCStreamSupportForLearner(info) { // learner does not support stream RPC except Snapshot
return rpctypes.ErrGRPCNotSupportedForLearner
}

Expand Down Expand Up @@ -259,6 +260,11 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
}
}

// learner does not support stream RPC except Snapshot and gRPC Health Watch
func isRPCStreamSupportForLearner(info *grpc.StreamServerInfo) bool {
return info.FullMethod == snapshotMethod || info.FullMethod == gRPCHealthWatch
}

// cancellableContext wraps a context with new cancellable context that allows a
// specific cancellation error to be preserved and later retrieved using the
// Context.Err() function. This is so downstream context users can disambiguate
Expand Down
43 changes: 2 additions & 41 deletions tests/e2e/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
_ "google.golang.org/grpc/health"

clientv3 "go.etcd.io/etcd/client/v3"
Expand All @@ -44,9 +43,8 @@ const (

func TestFailoverOnDefrag(t *testing.T) {
tcs := []struct {
name string
clusterOptions []e2e.EPClusterOption
gRPCDialOptions []grpc.DialOption
name string
clusterOptions []e2e.EPClusterOption

// common assertion
expectedMinQPS float64
Expand All @@ -62,10 +60,6 @@ func TestFailoverOnDefrag(t *testing.T) {
e2e.WithExperimentalStopGRPCServiceOnDefrag(true),
e2e.WithGoFailEnabled(true),
},
gRPCDialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinQPS: 20,
expectedMaxFailureRate: 0.01,
},
Expand All @@ -76,20 +70,6 @@ func TestFailoverOnDefrag(t *testing.T) {
e2e.WithExperimentalStopGRPCServiceOnDefrag(false),
e2e.WithGoFailEnabled(true),
},
gRPCDialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinQPS: 20,
expectedMinFailureRate: 0.25,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled",
clusterOptions: []e2e.EPClusterOption{
e2e.WithClusterSize(3),
e2e.WithExperimentalStopGRPCServiceOnDefrag(true),
e2e.WithGoFailEnabled(true),
},
expectedMinQPS: 20,
expectedMinFailureRate: 0.25,
},
Expand All @@ -100,10 +80,6 @@ func TestFailoverOnDefrag(t *testing.T) {
e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", true),
e2e.WithGoFailEnabled(true),
},
gRPCDialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinQPS: 20,
expectedMaxFailureRate: 0.01,
},
Expand All @@ -114,20 +90,6 @@ func TestFailoverOnDefrag(t *testing.T) {
e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", false),
e2e.WithGoFailEnabled(true),
},
gRPCDialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinQPS: 20,
expectedMinFailureRate: 0.25,
},
{
name: "defrag blocks one-third of requests with StopGRPCServiceOnDefrag feature gate set to true and client health check disabled",
clusterOptions: []e2e.EPClusterOption{
e2e.WithClusterSize(3),
e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", true),
e2e.WithGoFailEnabled(true),
},
expectedMinQPS: 20,
expectedMinFailureRate: 0.25,
},
Expand All @@ -151,7 +113,6 @@ func TestFailoverOnDefrag(t *testing.T) {
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
Endpoints: endpoints,
DialOptions: tc.gRPCDialOptions,
})
if cerr != nil {
return cerr
Expand Down

0 comments on commit af21881

Please sign in to comment.