diff --git a/server/grpc_service.go b/server/grpc_service.go index 48ac9875415..40c36e35ef7 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -59,6 +59,7 @@ var ( ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address") + ErrEtcdNotStarted = status.Errorf(codes.Unavailable, "server is started, but etcd not started") ) // GrpcServer wraps Server to provide grpc service. @@ -1835,6 +1836,9 @@ const globalConfigPath = "/global/config/" // Since item value needs to support marshal of different struct types, // it should be set to `Payload bytes` instead of `Value string` func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { + if s.client == nil { + return nil, ErrEtcdNotStarted + } configPath := request.GetConfigPath() if configPath == "" { configPath = globalConfigPath @@ -1869,6 +1873,9 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo // - `Names` iteratively get value from `ConfigPath/Name` but not care about revision // - `ConfigPath` if `Names` is nil can get all values and revision of current path func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) { + if s.client == nil { + return nil, ErrEtcdNotStarted + } configPath := request.GetConfigPath() if configPath == "" { configPath = globalConfigPath @@ -1905,6 +1912,9 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo // by Etcd.Watch() as long as the context has not been canceled or timed out. // Watch on revision which greater than or equal to the required revision. func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { + if s.client == nil { + return ErrEtcdNotStarted + } ctx, cancel := context.WithCancel(s.Context()) defer cancel() configPath := req.GetConfigPath() diff --git a/server/server.go b/server/server.go index 36a3cefd0af..faade6c64f8 100644 --- a/server/server.go +++ b/server/server.go @@ -1961,3 +1961,9 @@ func (s *Server) GetTSOUpdatePhysicalInterval() time.Duration { func (s *Server) GetMaxResetTSGap() time.Duration { return s.persistOptions.GetMaxResetTSGap() } + +// SetClient sets the etcd client. +// Notes: it is only used for test. +func (s *Server) SetClient(client *clientv3.Client) { + s.client = client +} diff --git a/tests/integrations/client/global_config_test.go b/tests/integrations/client/global_config_test.go index a81e98b9ad2..15034d035a6 100644 --- a/tests/integrations/client/global_config_test.go +++ b/tests/integrations/client/global_config_test.go @@ -17,6 +17,7 @@ package client_test import ( "path" "strconv" + "sync" "testing" "time" @@ -53,6 +54,7 @@ type globalConfigTestSuite struct { server *server.GrpcServer client pd.Client cleanup testutil.CleanupFunc + mu sync.Mutex } func TestGlobalConfigTestSuite(t *testing.T) { @@ -322,3 +324,31 @@ func (suite *globalConfigTestSuite) TestClientWatchWithRevision() { } } } + +func (suite *globalConfigTestSuite) TestEtcdNotStart() { + cli := suite.server.GetClient() + defer func() { + suite.mu.Lock() + suite.server.SetClient(cli) + suite.mu.Unlock() + }() + suite.mu.Lock() + suite.server.SetClient(nil) + suite.mu.Unlock() + err := suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Revision: 0, + }, nil) + suite.Error(err) + + _, err = suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Changes: []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Payload: []byte("0")}}, + }) + suite.Error(err) + + _, err = suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + Names: []string{"test_etcd"}, + }) + suite.Error(err) +}