Skip to content

Commit

Permalink
address the comment
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Sep 13, 2023
1 parent 53f609f commit d9a6c94
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 47 deletions.
20 changes: 13 additions & 7 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ var (
// GrpcServer wraps Server to provide grpc service.
type GrpcServer struct {
*Server
lastSchedulingPrimary string
schedulingClient schedulingpb.SchedulingClient
concurrentTSOProxyStreamings atomic.Int32
}

Expand Down Expand Up @@ -981,16 +983,20 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
s.handleDamagedStore(request.GetStats())
storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds())
if s.IsAPIServiceMode() {
client := s.getForwardedClient(ctx)
if client != nil {
s.updateSchedulingClient(ctx)
if s.schedulingClient != nil {
req := &schedulingpb.StoreHeartbeatRequest{
Header: &schedulingpb.RequestHeader{
ClusterId: request.GetHeader().GetClusterId(),
SenderId: request.GetHeader().GetSenderId(),
},
Stats: request.GetStats(),
}
schedulingpb.NewSchedulingClient(client).StoreHeartbeat(ctx, req)
if _, err := s.schedulingClient.StoreHeartbeat(ctx, req); err != nil {
// reset to let it be updated in the next request
s.schedulingClient = nil
s.lastSchedulingPrimary = ""
}
}
}
}
Expand All @@ -1006,16 +1012,16 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
return resp, nil
}

func (s *GrpcServer) getForwardedClient(ctx context.Context) *grpc.ClientConn {
func (s *GrpcServer) updateSchedulingClient(ctx context.Context) {
forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName)
if forwardedHost != "" {
if forwardedHost != "" && forwardedHost != s.lastSchedulingPrimary {
client, err := s.getDelegateClient(ctx, forwardedHost)
if err != nil {
log.Error("get delegate client failed", zap.Error(err))
}
return client
s.lastSchedulingPrimary = forwardedHost
s.schedulingClient = schedulingpb.NewSchedulingClient(client)
}
return nil
}

// bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error
Expand Down
40 changes: 0 additions & 40 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"math/rand"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"strconv"
Expand All @@ -40,7 +39,6 @@ import (
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
Expand Down Expand Up @@ -2013,44 +2011,6 @@ func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string
)
}

func (s *Server) initSchedulingPrimaryWatcher() {
serviceName := mcs.SchedulingServiceName
schedulingRootPath := endpoint.SchedulingSvcRootPath(s.clusterID)
schedulingServicePrimaryKey := path.Join(schedulingRootPath, mcs.PrimaryKey)
putFn := func(kv *mvccpb.KeyValue) error {
primary := &schedulingpb.Participant{} // TODO: use Generics
if err := proto.Unmarshal(kv.Value, primary); err != nil {
return err
}
listenUrls := primary.GetListenUrls()
if len(listenUrls) > 0 {
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
log.Info("update scheduling primary", zap.String("primary", listenUrls[0]))
}
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
var oldPrimary string
v, ok := s.servicePrimaryMap.Load(serviceName)
if ok {
oldPrimary = v.(string)
}
log.Info("delete scheduling primary", zap.String("old-primary", oldPrimary))
s.servicePrimaryMap.Delete(serviceName)
return nil
}
s.schedulingPrimaryWatcher = etcdutil.NewLoopWatcher(
s.serverLoopCtx,
&s.serverLoopWg,
s.client,
"scheduling-primary-watcher",
schedulingServicePrimaryKey,
putFn,
deleteFn,
func() error { return nil },
)
}

// RecoverAllocID recover alloc id. set current base id to input id
func (s *Server) RecoverAllocID(ctx context.Context, id uint64) error {
return s.idAllocator.SetBase(id)
Expand Down

0 comments on commit d9a6c94

Please sign in to comment.