diff --git a/cmd/maestro/server/grpc_server.go b/cmd/maestro/server/grpc_server.go index e58ee348..d353e72c 100644 --- a/cmd/maestro/server/grpc_server.go +++ b/cmd/maestro/server/grpc_server.go @@ -49,8 +49,14 @@ func NewGRPCServer(resourceService services.ResourceService, eventBroadcaster *e grpcServerOptions = append(grpcServerOptions, grpc.ConnectionTimeout(config.ConnectionTimeout)) grpcServerOptions = append(grpcServerOptions, grpc.WriteBufferSize(config.WriteBufferSize)) grpcServerOptions = append(grpcServerOptions, grpc.ReadBufferSize(config.ReadBufferSize)) + grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: config.ClientMinPingInterval, + PermitWithoutStream: config.PermitPingWithoutStream, + })) grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionAge: config.MaxConnectionAge, + Time: config.ServerPingInterval, + Timeout: config.ServerPingTimeout, })) if config.EnableTLS { diff --git a/go.mod b/go.mod index 5f926076..b84d81f0 100755 --- a/go.mod +++ b/go.mod @@ -165,3 +165,5 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) + +replace open-cluster-management.io/sdk-go => github.com/morvencao/ocm-sdk-go v0.0.0-20240719054854-a512ec8da872 diff --git a/go.sum b/go.sum index 1d700872..9b5c1bc3 100755 --- a/go.sum +++ b/go.sum @@ -392,6 +392,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/morvencao/ocm-sdk-go v0.0.0-20240719054854-a512ec8da872 h1:s4l33rPOPdUcRt4CBkow3FpMkDj/eCuqiVTWN8Ma0ZI= +github.com/morvencao/ocm-sdk-go v0.0.0-20240719054854-a512ec8da872/go.mod h1:xFmN3Db5nN68oLGnstmIRv4us8HJCdXFnBNMXVp0jWY= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= @@ -825,8 +827,6 @@ open-cluster-management.io/api v0.14.1-0.20240627145512-bd6f2229b53c h1:gYfgkX/U open-cluster-management.io/api v0.14.1-0.20240627145512-bd6f2229b53c/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM= open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 h1:7uPjyn1x25QZIzfZqeSFfZdNrzc2hlHm6t/JKYKu9fI= open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33/go.mod h1:KzUwhPZAg6Wq+4xRu10fVVpqNADyz5CtRW4ziqIC2z4= -open-cluster-management.io/sdk-go v0.14.1-0.20240717021054-955108a181ee h1:aQ4AoR8SKz/byOyZbbYC9Tbp4VCtRHje8uHbn438o84= -open-cluster-management.io/sdk-go v0.14.1-0.20240717021054-955108a181ee/go.mod h1:xFmN3Db5nN68oLGnstmIRv4us8HJCdXFnBNMXVp0jWY= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.29.0 h1:/U5vjBbQn3RChhv7P11uhYvCSm5G2GaIi5AIGBS6r4c= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.29.0/go.mod h1:z7+wmGM2dfIiLRfrC6jb5kV2Mq/sK1ZP303cxzkV5Y4= sigs.k8s.io/controller-runtime v0.18.4 h1:87+guW1zhvuPLh1PHybKdYFLU0YJp4FhJRmiHvm5BZw= diff --git a/pkg/config/grpc_server.go b/pkg/config/grpc_server.go index 592bbd94..cb903fcf 100755 --- a/pkg/config/grpc_server.go +++ b/pkg/config/grpc_server.go @@ -21,6 +21,10 @@ type GRPCServerConfig struct { WriteBufferSize int `json:"write_buffer_size"` ReadBufferSize int `json:"read_buffer_size"` MaxConnectionAge time.Duration `json:"max_connection_age"` + ClientMinPingInterval time.Duration `json:"client_min_ping_interval"` + ServerPingInterval time.Duration `json:"server_ping_interval"` + ServerPingTimeout time.Duration `json:"server_ping_timeout"` + PermitPingWithoutStream bool `json:"permit_ping_without_stream"` } func NewGRPCServerConfig() *GRPCServerConfig { @@ -36,6 +40,10 @@ func (s *GRPCServerConfig) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.MaxSendMessageSize, "grpc-max-send-message-size", math.MaxInt32, "gPRC max send message size") fs.DurationVar(&s.ConnectionTimeout, "grpc-connection-timeout", 120*time.Second, "gPRC connection timeout") fs.DurationVar(&s.MaxConnectionAge, "grpc-max-connection-age", time.Duration(math.MaxInt64), "A duration for the maximum amount of time connection may exist before closing") + fs.DurationVar(&s.ClientMinPingInterval, "grpc-client-min-ping-interval", 5*time.Second, "Server will terminate the connection if the client pings more than once within this duration") + fs.DurationVar(&s.ServerPingInterval, "grpc-server-ping-interval", 30*time.Second, "Duration after which the server pings the client if no activity is detected") + fs.DurationVar(&s.ServerPingTimeout, "grpc-server-ping-timeout", 10*time.Second, "Duration the client waits for a response after sending a keepalive ping") + fs.BoolVar(&s.PermitPingWithoutStream, "permit-ping-without-stream", false, "Allow keepalive pings even when there are no active streams") fs.IntVar(&s.WriteBufferSize, "grpc-write-buffer-size", 32*1024, "gPRC write buffer size") fs.IntVar(&s.ReadBufferSize, "grpc-read-buffer-size", 32*1024, "gPRC read buffer size") fs.StringVar(&s.TLSCertFile, "grpc-tls-cert-file", "", "The path to the tls.crt file") diff --git a/test/e2e/pkg/suite_test.go b/test/e2e/pkg/suite_test.go index dc185781..92dc3ced 100644 --- a/test/e2e/pkg/suite_test.go +++ b/test/e2e/pkg/suite_test.go @@ -92,6 +92,9 @@ var _ = BeforeSuite(func() { sourceID = "sourceclient-test" + rand.String(5) grpcOptions = grpcoptions.NewGRPCOptions() grpcOptions.URL = grpcServerAddress + grpcOptions.KeepAliveOptions.Enable = true + grpcOptions.KeepAliveOptions.Time = 6 * time.Second + grpcOptions.KeepAliveOptions.Timeout = 1 * time.Second workClient, err = grpcsource.NewMaestroGRPCSourceWorkClient( ctx,