diff --git a/server/grpc_service.go b/server/grpc_service.go index 7229b085d0b9..e067b846b9d9 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -60,9 +60,20 @@ const ( var ( // ErrNotLeader is returned when current server is not the leader and not possible to process request. // TODO: work as proxy. +<<<<<<< HEAD ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") +======= + ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") + ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") + ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address") + ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout") + ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded") + ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server") + ErrEtcdNotStarted = status.Errorf(codes.Unavailable, "server is started, but etcd not started") +>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866)) ) // GrpcServer wraps Server to provide grpc service. @@ -1896,6 +1907,16 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan // StoreGlobalConfig store global config into etcd by transaction func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { +<<<<<<< HEAD +======= + if s.client == nil { + return nil, ErrEtcdNotStarted + } + configPath := request.GetConfigPath() + if configPath == "" { + configPath = globalConfigPath + } +>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866)) ops := make([]clientv3.Op, len(request.Changes)) for i, item := range request.Changes { name := globalConfigPath + item.GetName() @@ -1915,6 +1936,7 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo // LoadGlobalConfig load global config from etcd func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) { +<<<<<<< HEAD names := request.Names res := make([]*pdpb.GlobalConfigItem, len(names)) for i, name := range names { @@ -1926,15 +1948,48 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}} } else { res[i] = &pdpb.GlobalConfigItem{Name: name, Value: string(r.Kvs[0].Value)} +======= + if s.client == nil { + return nil, ErrEtcdNotStarted + } + configPath := request.GetConfigPath() + if configPath == "" { + configPath = globalConfigPath + } + // Since item value needs to support marshal of different struct types, + // it should be set to `Payload bytes` instead of `Value string`. + if request.Names != nil { + res := make([]*pdpb.GlobalConfigItem, len(request.Names)) + for i, name := range request.Names { + r, err := s.client.Get(ctx, path.Join(configPath, name)) + if err != nil { + res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}} + } else if len(r.Kvs) == 0 { + msg := "key " + name + " not found" + res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}} + } else { + res[i] = &pdpb.GlobalConfigItem{Name: name, Payload: r.Kvs[0].Value, Kind: pdpb.EventType_PUT} + } +>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866)) } } return &pdpb.LoadGlobalConfigResponse{Items: res}, nil } +<<<<<<< HEAD // WatchGlobalConfig if the connection of WatchGlobalConfig is end // or stoped by whatever reason // just reconnect to it. func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { +======= +// WatchGlobalConfig will retry on recoverable errors forever until reconnected +// by Etcd.Watch() as long as the context has not been canceled or timed out. +// Watch on revision which greater than or equal to the required revision. +func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { + if s.client == nil { + return ErrEtcdNotStarted + } +>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866)) ctx, cancel := context.WithCancel(s.Context()) defer cancel() err := s.sendAllGlobalConfig(ctx, server) diff --git a/server/server.go b/server/server.go index 1ef364f79ab7..4e76cbf6a3cb 100644 --- a/server/server.go +++ b/server/server.go @@ -1770,3 +1770,48 @@ func (s *Server) SetExternalTS(externalTS uint64) error { s.GetRaftCluster().SetExternalTS(externalTS) return nil } +<<<<<<< HEAD +======= + +// IsLocalTSOEnabled returns if the local TSO is enabled. +func (s *Server) IsLocalTSOEnabled() bool { + return s.cfg.IsLocalTSOEnabled() +} + +// GetMaxConcurrentTSOProxyStreamings returns the max concurrent TSO proxy streamings. +// If the value is negative, there is no limit. +func (s *Server) GetMaxConcurrentTSOProxyStreamings() int { + return s.cfg.GetMaxConcurrentTSOProxyStreamings() +} + +// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client. +func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration { + return s.cfg.GetTSOProxyRecvFromClientTimeout() +} + +// GetLeaderLease returns the leader lease. +func (s *Server) GetLeaderLease() int64 { + return s.cfg.GetLeaderLease() +} + +// GetTSOSaveInterval returns TSO save interval. +func (s *Server) GetTSOSaveInterval() time.Duration { + return s.cfg.GetTSOSaveInterval() +} + +// GetTSOUpdatePhysicalInterval returns TSO update physical interval. +func (s *Server) GetTSOUpdatePhysicalInterval() time.Duration { + return s.cfg.GetTSOUpdatePhysicalInterval() +} + +// GetMaxResetTSGap gets the max gap to reset the tso. +func (s *Server) GetMaxResetTSGap() time.Duration { + return s.persistOptions.GetMaxResetTSGap() +} + +// SetClient sets the etcd client. +// Notes: it is only used for test. +func (s *Server) SetClient(client *clientv3.Client) { + s.client = client +} +>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866)) diff --git a/tests/integrations/client/global_config_test.go b/tests/integrations/client/global_config_test.go new file mode 100644 index 000000000000..15034d035a69 --- /dev/null +++ b/tests/integrations/client/global_config_test.go @@ -0,0 +1,354 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client_test + +import ( + "path" + "strconv" + "sync" + "testing" + "time" + + pd "github.com/tikv/pd/client" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/utils/assertutil" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/server" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +const globalConfigPath = "/global/config/" + +type testReceiver struct { + re *require.Assertions + grpc.ServerStream +} + +func (s testReceiver) Send(m *pdpb.WatchGlobalConfigResponse) error { + log.Info("received", zap.Any("received", m.GetChanges())) + for _, change := range m.GetChanges() { + s.re.Contains(change.Name, globalConfigPath+string(change.Payload)) + } + return nil +} + +type globalConfigTestSuite struct { + suite.Suite + server *server.GrpcServer + client pd.Client + cleanup testutil.CleanupFunc + mu sync.Mutex +} + +func TestGlobalConfigTestSuite(t *testing.T) { + suite.Run(t, new(globalConfigTestSuite)) +} + +func (suite *globalConfigTestSuite) SetupSuite() { + var err error + var gsi *server.Server + checker := assertutil.NewChecker() + checker.FailNow = func() {} + gsi, suite.cleanup, err = server.NewTestServer(suite.Require(), checker) + suite.server = &server.GrpcServer{Server: gsi} + suite.NoError(err) + addr := suite.server.GetAddr() + suite.client, err = pd.NewClientWithContext(suite.server.Context(), []string{addr}, pd.SecurityOption{}) + suite.NoError(err) +} + +func (suite *globalConfigTestSuite) TearDownSuite() { + suite.client.Close() + suite.cleanup() +} + +func (suite *globalConfigTestSuite) GetEtcdPath(configPath string) string { + return globalConfigPath + configPath +} + +func (suite *globalConfigTestSuite) TestLoadWithoutNames() { + defer func() { + // clean up + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) + suite.NoError(err) + }() + r, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("test"), "test") + suite.NoError(err) + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + ConfigPath: globalConfigPath, + }) + suite.NoError(err) + suite.Len(res.Items, 1) + suite.Equal(r.Header.GetRevision(), res.Revision) + suite.Equal("test", string(res.Items[0].Payload)) +} + +func (suite *globalConfigTestSuite) TestLoadWithoutConfigPath() { + defer func() { + // clean up + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("source_id")) + suite.NoError(err) + }() + _, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("source_id"), "1") + suite.NoError(err) + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + Names: []string{"source_id"}, + }) + suite.NoError(err) + suite.Len(res.Items, 1) + suite.Equal([]byte("1"), res.Items[0].Payload) +} + +func (suite *globalConfigTestSuite) TestLoadOtherConfigPath() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Put(suite.server.Context(), path.Join("OtherConfigPath", strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + Names: []string{"0", "1"}, + ConfigPath: "OtherConfigPath", + }) + suite.NoError(err) + suite.Len(res.Items, 2) + for i, item := range res.Items { + suite.Equal(&pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: strconv.Itoa(i), Payload: []byte(strconv.Itoa(i))}, item) + } +} + +func (suite *globalConfigTestSuite) TestLoadAndStore() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) + suite.NoError(err) + } + }() + changes := []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Payload: []byte("0")}, {Kind: pdpb.EventType_PUT, Name: "1", Payload: []byte("1")}, {Kind: pdpb.EventType_PUT, Name: "2", Payload: []byte("2")}} + _, err := suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Changes: changes, + }) + suite.NoError(err) + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + ConfigPath: globalConfigPath, + }) + suite.Len(res.Items, 3) + suite.NoError(err) + for i, item := range res.Items { + suite.Equal(&pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: suite.GetEtcdPath(strconv.Itoa(i)), Payload: []byte(strconv.Itoa(i))}, item) + } +} + +func (suite *globalConfigTestSuite) TestStore() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) + suite.NoError(err) + } + }() + changes := []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Payload: []byte("0")}, {Kind: pdpb.EventType_PUT, Name: "1", Payload: []byte("1")}, {Kind: pdpb.EventType_PUT, Name: "2", Payload: []byte("2")}} + _, err := suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Changes: changes, + }) + suite.NoError(err) + for i := 0; i < 3; i++ { + res, err := suite.server.GetClient().Get(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + suite.Equal(suite.GetEtcdPath(string(res.Kvs[0].Value)), string(res.Kvs[0].Key)) + } +} + +func (suite *globalConfigTestSuite) TestWatch() { + defer func() { + for i := 0; i < 3; i++ { + // clean up + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + server := testReceiver{re: suite.Require()} + go suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Revision: 0, + }, server) + for i := 0; i < 6; i++ { + _, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + for i := 3; i < 6; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + ConfigPath: globalConfigPath, + }) + suite.Len(res.Items, 3) + suite.NoError(err) +} + +func (suite *globalConfigTestSuite) TestClientLoadWithoutNames() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + res, _, err := suite.client.LoadGlobalConfig(suite.server.Context(), nil, globalConfigPath) + suite.NoError(err) + suite.Len(res, 3) + for i, item := range res { + suite.Equal(pd.GlobalConfigItem{EventType: pdpb.EventType_PUT, Name: suite.GetEtcdPath(strconv.Itoa(i)), PayLoad: []byte(strconv.Itoa(i)), Value: strconv.Itoa(i)}, item) + } +} + +func (suite *globalConfigTestSuite) TestClientLoadWithoutConfigPath() { + defer func() { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("source_id")) + suite.NoError(err) + }() + _, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("source_id"), "1") + suite.NoError(err) + res, _, err := suite.client.LoadGlobalConfig(suite.server.Context(), []string{"source_id"}, "") + suite.NoError(err) + suite.Len(res, 1) + suite.Equal(pd.GlobalConfigItem{EventType: pdpb.EventType_PUT, Name: "source_id", PayLoad: []byte("1"), Value: "1"}, res[0]) +} + +func (suite *globalConfigTestSuite) TestClientLoadOtherConfigPath() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Put(suite.server.Context(), path.Join("OtherConfigPath", strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + res, _, err := suite.client.LoadGlobalConfig(suite.server.Context(), []string{"0", "1"}, "OtherConfigPath") + suite.NoError(err) + suite.Len(res, 2) + for i, item := range res { + suite.Equal(pd.GlobalConfigItem{EventType: pdpb.EventType_PUT, Name: strconv.Itoa(i), PayLoad: []byte(strconv.Itoa(i)), Value: strconv.Itoa(i)}, item) + } +} + +func (suite *globalConfigTestSuite) TestClientStore() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + err := suite.client.StoreGlobalConfig(suite.server.Context(), globalConfigPath, + []pd.GlobalConfigItem{{Name: "0", Value: "0"}, {Name: "1", Value: "1"}, {Name: "2", Value: "2"}}) + suite.NoError(err) + for i := 0; i < 3; i++ { + res, err := suite.server.GetClient().Get(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + suite.Equal(suite.GetEtcdPath(string(res.Kvs[0].Value)), string(res.Kvs[0].Key)) + } +} + +func (suite *globalConfigTestSuite) TestClientWatchWithRevision() { + defer func() { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) + suite.NoError(err) + + for i := 3; i < 9; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + // Mock get revision by loading + r, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("test"), "test") + suite.NoError(err) + res, revision, err := suite.client.LoadGlobalConfig(suite.server.Context(), nil, globalConfigPath) + suite.NoError(err) + suite.Len(res, 1) + suite.LessOrEqual(r.Header.GetRevision(), revision) + suite.Equal(pd.GlobalConfigItem{EventType: pdpb.EventType_PUT, Name: suite.GetEtcdPath("test"), PayLoad: []byte("test"), Value: "test"}, res[0]) + // Mock when start watcher there are existed some keys, will load firstly + for i := 0; i < 6; i++ { + _, err = suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + // Start watcher at next revision + configChan, err := suite.client.WatchGlobalConfig(suite.server.Context(), globalConfigPath, revision) + suite.NoError(err) + // Mock delete + for i := 0; i < 3; i++ { + _, err = suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + // Mock put + for i := 6; i < 9; i++ { + _, err = suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + for { + select { + case <-time.After(time.Second): + return + case res := <-configChan: + for _, r := range res { + suite.Equal(suite.GetEtcdPath(r.Value), r.Name) + } + } + } +} + +func (suite *globalConfigTestSuite) TestEtcdNotStart() { + cli := suite.server.GetClient() + defer func() { + suite.mu.Lock() + suite.server.SetClient(cli) + suite.mu.Unlock() + }() + suite.mu.Lock() + suite.server.SetClient(nil) + suite.mu.Unlock() + err := suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Revision: 0, + }, nil) + suite.Error(err) + + _, err = suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Changes: []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Payload: []byte("0")}}, + }) + suite.Error(err) + + _, err = suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + Names: []string{"test_etcd"}, + }) + suite.Error(err) +}