From d9a6c942a0d73927cab226146cc129aa7f4e9b51 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 13 Sep 2023 18:02:59 +0800 Subject: [PATCH] address the comment Signed-off-by: Ryan Leung --- server/grpc_service.go | 20 +++++++++++++------- server/server.go | 40 ---------------------------------------- 2 files changed, 13 insertions(+), 47 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 8198d8f960f3..a433733525dd 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -77,6 +77,8 @@ var ( // GrpcServer wraps Server to provide grpc service. type GrpcServer struct { *Server + lastSchedulingPrimary string + schedulingClient schedulingpb.SchedulingClient concurrentTSOProxyStreamings atomic.Int32 } @@ -981,8 +983,8 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear s.handleDamagedStore(request.GetStats()) storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) if s.IsAPIServiceMode() { - client := s.getForwardedClient(ctx) - if client != nil { + s.updateSchedulingClient(ctx) + if s.schedulingClient != nil { req := &schedulingpb.StoreHeartbeatRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -990,7 +992,11 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear }, Stats: request.GetStats(), } - schedulingpb.NewSchedulingClient(client).StoreHeartbeat(ctx, req) + if _, err := s.schedulingClient.StoreHeartbeat(ctx, req); err != nil { + // reset to let it be updated in the next request + s.schedulingClient = nil + s.lastSchedulingPrimary = "" + } } } } @@ -1006,16 +1012,16 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear return resp, nil } -func (s *GrpcServer) getForwardedClient(ctx context.Context) *grpc.ClientConn { +func (s *GrpcServer) updateSchedulingClient(ctx context.Context) { forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) - if forwardedHost != "" { + if forwardedHost != "" && forwardedHost != s.lastSchedulingPrimary { client, err := s.getDelegateClient(ctx, forwardedHost) if err != nil { log.Error("get delegate client failed", zap.Error(err)) } - return client + s.lastSchedulingPrimary = forwardedHost + s.schedulingClient = schedulingpb.NewSchedulingClient(client) } - return nil } // bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error diff --git a/server/server.go b/server/server.go index 9e980b539677..2a076923caf5 100644 --- a/server/server.go +++ b/server/server.go @@ -22,7 +22,6 @@ import ( "math/rand" "net/http" "os" - "path" "path/filepath" "runtime" "strconv" @@ -40,7 +39,6 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" "github.com/pingcap/sysutil" @@ -2013,44 +2011,6 @@ func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string ) } -func (s *Server) initSchedulingPrimaryWatcher() { - serviceName := mcs.SchedulingServiceName - schedulingRootPath := endpoint.SchedulingSvcRootPath(s.clusterID) - schedulingServicePrimaryKey := path.Join(schedulingRootPath, mcs.PrimaryKey) - putFn := func(kv *mvccpb.KeyValue) error { - primary := &schedulingpb.Participant{} // TODO: use Generics - if err := proto.Unmarshal(kv.Value, primary); err != nil { - return err - } - listenUrls := primary.GetListenUrls() - if len(listenUrls) > 0 { - s.servicePrimaryMap.Store(serviceName, listenUrls[0]) - log.Info("update scheduling primary", zap.String("primary", listenUrls[0])) - } - return nil - } - deleteFn := func(kv *mvccpb.KeyValue) error { - var oldPrimary string - v, ok := s.servicePrimaryMap.Load(serviceName) - if ok { - oldPrimary = v.(string) - } - log.Info("delete scheduling primary", zap.String("old-primary", oldPrimary)) - s.servicePrimaryMap.Delete(serviceName) - return nil - } - s.schedulingPrimaryWatcher = etcdutil.NewLoopWatcher( - s.serverLoopCtx, - &s.serverLoopWg, - s.client, - "scheduling-primary-watcher", - schedulingServicePrimaryKey, - putFn, - deleteFn, - func() error { return nil }, - ) -} - // RecoverAllocID recover alloc id. set current base id to input id func (s *Server) RecoverAllocID(ctx context.Context, id uint64) error { return s.idAllocator.SetBase(id)