Skip to content

Commit

Permalink
This is an automated cherry-pick of #6866
Browse files Browse the repository at this point in the history
close #6860

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
HuSharp authored and ti-chi-bot committed Aug 10, 2023
1 parent 1198030 commit bb287a4
Show file tree
Hide file tree
Showing 3 changed files with 418 additions and 0 deletions.
58 changes: 58 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,20 @@ const (
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
<<<<<<< HEAD
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
=======
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address")
ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout")
ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded")
ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server")
ErrEtcdNotStarted = status.Errorf(codes.Unavailable, "server is started, but etcd not started")
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
)

// GrpcServer wraps Server to provide grpc service.
Expand Down Expand Up @@ -1706,7 +1717,20 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
}

// StoreGlobalConfig store global config into etcd by transaction
<<<<<<< HEAD
func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
=======
// Since item value needs to support marshal of different struct types,
// it should be set to `Payload bytes` instead of `Value string`
func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
if s.client == nil {
return nil, ErrEtcdNotStarted
}
configPath := request.GetConfigPath()
if configPath == "" {
configPath = globalConfigPath
}
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
ops := make([]clientv3.Op, len(request.Changes))
for i, item := range request.Changes {
name := globalConfigPath + item.GetName()
Expand All @@ -1726,6 +1750,7 @@ func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreG

// LoadGlobalConfig load global config from etcd
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
<<<<<<< HEAD
names := request.Names
res := make([]*pdpb.GlobalConfigItem, len(names))
for i, name := range names {
Expand All @@ -1737,15 +1762,48 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}}
} else {
res[i] = &pdpb.GlobalConfigItem{Name: name, Value: string(r.Kvs[0].Value)}
=======
if s.client == nil {
return nil, ErrEtcdNotStarted
}
configPath := request.GetConfigPath()
if configPath == "" {
configPath = globalConfigPath
}
// Since item value needs to support marshal of different struct types,
// it should be set to `Payload bytes` instead of `Value string`.
if request.Names != nil {
res := make([]*pdpb.GlobalConfigItem, len(request.Names))
for i, name := range request.Names {
r, err := s.client.Get(ctx, path.Join(configPath, name))
if err != nil {
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}
} else if len(r.Kvs) == 0 {
msg := "key " + name + " not found"
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}}
} else {
res[i] = &pdpb.GlobalConfigItem{Name: name, Payload: r.Kvs[0].Value, Kind: pdpb.EventType_PUT}
}
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
}
}
return &pdpb.LoadGlobalConfigResponse{Items: res}, nil
}

<<<<<<< HEAD
// WatchGlobalConfig if the connection of WatchGlobalConfig is end
// or stoped by whatever reason
// just reconnect to it.
func (s *GrpcServer) WatchGlobalConfig(request *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
=======
// WatchGlobalConfig will retry on recoverable errors forever until reconnected
// by Etcd.Watch() as long as the context has not been canceled or timed out.
// Watch on revision which greater than or equal to the required revision.
func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
if s.client == nil {
return ErrEtcdNotStarted
}
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
err := s.sendAllGlobalConfig(ctx, server)
Expand Down
6 changes: 6 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1414,3 +1414,9 @@ func (s *Server) SaveTTLConfig(data map[string]interface{}, ttl time.Duration) e
func (s *Server) SplitAndScatterRegions(context context.Context, r *pdpb.SplitAndScatterRegionsRequest) (*pdpb.SplitAndScatterRegionsResponse, error) {
return nil, errors.New("no implemented")
}

// SetClient sets the etcd client.
// Notes: it is only used for test.
func (s *Server) SetClient(client *clientv3.Client) {
s.client = client
}
Loading

0 comments on commit bb287a4

Please sign in to comment.