Skip to content

Commit

Permalink
add retry mech
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Jul 31, 2023
1 parent 56f5844 commit 63ac7f9
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 16 deletions.
22 changes: 12 additions & 10 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout")
ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded")
ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server")
ErrEtcdClientRequestFailed = status.Errorf(codes.Internal, "etcd client request failed")
)

// GrpcServer wraps Server to provide grpc service.
Expand Down Expand Up @@ -275,15 +276,15 @@ func (s *GrpcServer) getMinTSFromSingleServer(
}

// GetMembers implements gRPC PDServer.
func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
func (s *GrpcServer) GetMembers(ctx context.Context, _ *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
// Here we purposely do not check the cluster ID because the client does not know the correct cluster ID
// at startup and needs to get the cluster ID with the first request (i.e. GetMembers).
if s.IsClosed() {
return &pdpb.GetMembersResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, errs.ErrServerNotStarted.FastGenByArgs().Error()),
}, nil
}
members, err := cluster.GetMembers(s.GetClient())
members, err := cluster.GetMembers(s.GetEtcdClientWithRetry(ctx))
if err != nil {
return &pdpb.GetMembersResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
Expand Down Expand Up @@ -2281,9 +2282,9 @@ const globalConfigPath = "/global/config/"
// StoreGlobalConfig store global config into etcd by transaction
// Since item value needs to support marshal of different struct types,
// it should be set to `Payload bytes` instead of `Value string`
func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
if s.client == nil {
return nil, errors.New("failed to store global config, etcd client not found")
func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
if s.GetEtcdClientWithRetry(ctx) == nil {
return &pdpb.StoreGlobalConfigResponse{}, ErrEtcdClientRequestFailed
}
configPath := request.GetConfigPath()
if configPath == "" {
Expand Down Expand Up @@ -2319,8 +2320,8 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo
// - `Names` iteratively get value from `ConfigPath/Name` but not care about revision
// - `ConfigPath` if `Names` is nil can get all values and revision of current path
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
if s.client == nil {
return nil, errors.New("failed to load global config, etcd client not found")
if s.GetEtcdClientWithRetry(ctx) == nil {
return &pdpb.LoadGlobalConfigResponse{}, ErrEtcdClientRequestFailed
}
configPath := request.GetConfigPath()
if configPath == "" {
Expand Down Expand Up @@ -2358,11 +2359,12 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo
// by Etcd.Watch() as long as the context has not been canceled or timed out.
// Watch on revision which greater than or equal to the required revision.
func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
if s.client == nil {
return errors.Errorf("failed to watch global config, etcd client not found")
}
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
if s.GetEtcdClientWithRetry(ctx) == nil {
return ErrEtcdClientRequestFailed
}
println("WatchGlobalConfig")
configPath := req.GetConfigPath()
if configPath == "" {
configPath = globalConfigPath
Expand Down
16 changes: 16 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,22 @@ func (s *Server) GetClient() *clientv3.Client {
return s.client
}

func (s *Server) GetEtcdClientWithRetry(ctx context.Context) *clientv3.Client {
newCtx, cancel := context.WithTimeout(ctx, EtcdStartTimeout)
defer cancel()

ticker := time.NewTicker(retryIntervalGetServicePrimary)
defer ticker.Stop()
for s.client == nil {
select {
case <-newCtx.Done(): // timeout
return nil
case <-ticker.C:
}
}
return s.client
}

// GetHTTPClient returns builtin http client.
func (s *Server) GetHTTPClient() *http.Client {
return s.httpClient
Expand Down
20 changes: 14 additions & 6 deletions tests/integrations/client/global_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,20 +335,28 @@ func (suite *globalConfigTestSuite) TestEtcdNotStart() {
suite.mu.Lock()
suite.server.SetClient(nil)
suite.mu.Unlock()
err := suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{

go func() {
time.Sleep(100 * time.Millisecond)
suite.mu.Lock()
suite.server.SetClient(cli)
suite.mu.Unlock()
}()

server := testReceiver{re: suite.Require()}
go suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{
ConfigPath: globalConfigPath,
Revision: 0,
}, nil)
suite.Error(err)
}, server)

_, err = suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{
_, err := suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{
ConfigPath: globalConfigPath,
Changes: []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Payload: []byte("0")}},
})
suite.Error(err)
suite.NoError(err)

_, err = suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{
Names: []string{"test_etcd"},
})
suite.Error(err)
suite.NoError(err)
}

0 comments on commit 63ac7f9

Please sign in to comment.