Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6866
Browse files Browse the repository at this point in the history
close tikv#6860

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
HuSharp authored and ti-chi-bot committed Aug 10, 2023
1 parent adf0402 commit 8922a7e
Show file tree
Hide file tree
Showing 3 changed files with 454 additions and 0 deletions.
55 changes: 55 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,20 @@ const (
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
<<<<<<< HEAD
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
=======
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address")
ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout")
ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded")
ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server")
ErrEtcdNotStarted = status.Errorf(codes.Unavailable, "server is started, but etcd not started")
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
)

// GrpcServer wraps Server to provide grpc service.
Expand Down Expand Up @@ -1896,6 +1907,16 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan

// StoreGlobalConfig store global config into etcd by transaction
func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
<<<<<<< HEAD
=======
if s.client == nil {
return nil, ErrEtcdNotStarted
}
configPath := request.GetConfigPath()
if configPath == "" {
configPath = globalConfigPath
}
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
ops := make([]clientv3.Op, len(request.Changes))
for i, item := range request.Changes {
name := globalConfigPath + item.GetName()
Expand All @@ -1915,6 +1936,7 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo

// LoadGlobalConfig load global config from etcd
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
<<<<<<< HEAD
names := request.Names
res := make([]*pdpb.GlobalConfigItem, len(names))
for i, name := range names {
Expand All @@ -1926,15 +1948,48 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}}
} else {
res[i] = &pdpb.GlobalConfigItem{Name: name, Value: string(r.Kvs[0].Value)}
=======
if s.client == nil {
return nil, ErrEtcdNotStarted
}
configPath := request.GetConfigPath()
if configPath == "" {
configPath = globalConfigPath
}
// Since item value needs to support marshal of different struct types,
// it should be set to `Payload bytes` instead of `Value string`.
if request.Names != nil {
res := make([]*pdpb.GlobalConfigItem, len(request.Names))
for i, name := range request.Names {
r, err := s.client.Get(ctx, path.Join(configPath, name))
if err != nil {
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}
} else if len(r.Kvs) == 0 {
msg := "key " + name + " not found"
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}}
} else {
res[i] = &pdpb.GlobalConfigItem{Name: name, Payload: r.Kvs[0].Value, Kind: pdpb.EventType_PUT}
}
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
}
}
return &pdpb.LoadGlobalConfigResponse{Items: res}, nil
}

<<<<<<< HEAD
// WatchGlobalConfig if the connection of WatchGlobalConfig is end
// or stoped by whatever reason
// just reconnect to it.
func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
=======
// WatchGlobalConfig will retry on recoverable errors forever until reconnected
// by Etcd.Watch() as long as the context has not been canceled or timed out.
// Watch on revision which greater than or equal to the required revision.
func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
if s.client == nil {
return ErrEtcdNotStarted
}
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
err := s.sendAllGlobalConfig(ctx, server)
Expand Down
45 changes: 45 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1770,3 +1770,48 @@ func (s *Server) SetExternalTS(externalTS uint64) error {
s.GetRaftCluster().SetExternalTS(externalTS)
return nil
}
<<<<<<< HEAD
=======

// IsLocalTSOEnabled returns if the local TSO is enabled.
func (s *Server) IsLocalTSOEnabled() bool {
return s.cfg.IsLocalTSOEnabled()
}

// GetMaxConcurrentTSOProxyStreamings returns the max concurrent TSO proxy streamings.
// If the value is negative, there is no limit.
func (s *Server) GetMaxConcurrentTSOProxyStreamings() int {
return s.cfg.GetMaxConcurrentTSOProxyStreamings()
}

// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client.
func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration {
return s.cfg.GetTSOProxyRecvFromClientTimeout()
}

// GetLeaderLease returns the leader lease.
func (s *Server) GetLeaderLease() int64 {
return s.cfg.GetLeaderLease()
}

// GetTSOSaveInterval returns TSO save interval.
func (s *Server) GetTSOSaveInterval() time.Duration {
return s.cfg.GetTSOSaveInterval()
}

// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
func (s *Server) GetTSOUpdatePhysicalInterval() time.Duration {
return s.cfg.GetTSOUpdatePhysicalInterval()
}

// GetMaxResetTSGap gets the max gap to reset the tso.
func (s *Server) GetMaxResetTSGap() time.Duration {
return s.persistOptions.GetMaxResetTSGap()
}

// SetClient sets the etcd client.
// Notes: it is only used for test.
func (s *Server) SetClient(client *clientv3.Client) {
s.client = client
}
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
Loading

0 comments on commit 8922a7e

Please sign in to comment.