Skip to content

Commit

Permalink
mcs: forward store requests to scheduling server (#6976)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx authored Sep 14, 2023
1 parent 11b752e commit 1a0c8fb
Show file tree
Hide file tree
Showing 15 changed files with 235 additions and 24 deletions.
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,24 @@ require (
github.com/docker/go-units v0.4.0
github.com/elliotchance/pie/v2 v2.1.0
github.com/gin-contrib/cors v1.4.0
github.com/gin-contrib/gzip v0.0.1
github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.8.1
github.com/go-echarts/go-echarts v1.0.0
github.com/gogo/protobuf v1.3.2
github.com/google/btree v1.1.2
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.7.4
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/joho/godotenv v1.4.0
github.com/mailru/easyjson v0.7.6
github.com/mattn/go-shellwords v1.0.12
github.com/mgechev/revive v1.0.2
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974
github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511
Expand All @@ -39,6 +42,7 @@ require (
github.com/sasha-s/go-deadlock v0.2.0
github.com/shirou/gopsutil/v3 v3.23.3
github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072
github.com/soheilhy/cmux v0.1.4
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.2
Expand All @@ -48,6 +52,7 @@ require (
github.com/unrolled/render v1.0.1
github.com/urfave/negroni v0.3.0
go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.12
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230108222341-4b8118a2686a
Expand Down Expand Up @@ -92,7 +97,6 @@ require (
github.com/fogleman/gg v1.3.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/gin-contrib/gzip v0.0.1
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
Expand All @@ -112,7 +116,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand All @@ -128,7 +131,6 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a // indirect
github.com/mailru/easyjson v0.7.6
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.8 // indirect
Expand Down Expand Up @@ -158,7 +160,6 @@ require (
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/soheilhy/cmux v0.1.4
github.com/stretchr/objx v0.5.0 // indirect
github.com/swaggo/files v0.0.0-20210815190702-a29dd2bc99b2 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
Expand All @@ -173,7 +174,6 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
// Fix panic in unit test with go >= 1.14, ref: etcd-io/bbolt#201 https://github.com/etcd-io/bbolt/pull/201
go.etcd.io/bbolt v1.3.6 // indirect
go.uber.org/atomic v1.10.0
go.uber.org/dig v1.9.0 // indirect
go.uber.org/fx v1.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 h1:Gn8rf2Mb3QDifUQHdtcopqKclc9L11hjhZFYBE65lcw=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM=
github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
77 changes: 76 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package server

import (
"context"
"errors"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/schedule"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"go.uber.org/zap"
)

// Cluster is used to manage all information for scheduling purpose.
Expand All @@ -28,6 +31,7 @@ type Cluster struct {
ruleManager *placement.RuleManager
labelerManager *labeler.RegionLabeler
regionStats *statistics.RegionStatistics
labelLevelStats *statistics.LabelStatistics
hotStat *statistics.HotStat
storage storage.Storage
coordinator *schedule.Coordinator
Expand All @@ -53,6 +57,7 @@ func NewCluster(ctx context.Context, persistConfig *config.PersistConfig, storag
persistConfig: persistConfig,
hotStat: statistics.NewHotStat(ctx),
regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager),
labelLevelStats: statistics.NewLabelStatistics(),
storage: storage,
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,
Expand Down Expand Up @@ -174,4 +179,74 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {

// UpdateRegionsLabelLevelStats updates the status of the region label level by types.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {
for _, region := range regions {
c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels())
}
}

func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo {
stores := make([]*core.StoreInfo, 0, len(region.GetPeers()))
for _, p := range region.GetPeers() {
if store := c.GetStore(p.GetStoreId()); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) {
stores = append(stores, store)
}
}
return stores
}

// HandleStoreHeartbeat updates the store status.
func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatRequest) error {
stats := heartbeat.GetStats()
storeID := stats.GetStoreId()
store := c.GetStore(storeID)
if store == nil {
return errors.Errorf("store %v not found", storeID)
}

nowTime := time.Now()
newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime))

if store := c.GetStore(storeID); store != nil {
statistics.UpdateStoreHeartbeatMetrics(store)
}
c.PutStore(newStore)
c.hotStat.Observe(storeID, newStore.GetStoreStats())
c.hotStat.FilterUnhealthyStore(c)
reportInterval := stats.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()

regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats()))
for _, peerStat := range stats.GetPeerStats() {
regionID := peerStat.GetRegionId()
region := c.GetRegion(regionID)
regions[regionID] = region
if region == nil {
log.Warn("discard hot peer stat for unknown region",
zap.Uint64("region-id", regionID),
zap.Uint64("store-id", storeID))
continue
}
peer := region.GetStorePeer(storeID)
if peer == nil {
log.Warn("discard hot peer stat for unknown region peer",
zap.Uint64("region-id", regionID),
zap.Uint64("store-id", storeID))
continue
}
readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats())
loads := []float64{
utils.RegionReadBytes: float64(peerStat.GetReadBytes()),
utils.RegionReadKeys: float64(peerStat.GetReadKeys()),
utils.RegionReadQueryNum: float64(readQueryNum),
utils.RegionWriteBytes: 0,
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
}
peerInfo := core.NewPeerInfo(peer, loads, interval)
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region))
}

// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
return nil
}
24 changes: 24 additions & 0 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package server

import (
"context"
"net/http"

"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -64,8 +67,29 @@ func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService {
}
}

// StoreHeartbeat implements gRPC PDServer.
func (s *Service) StoreHeartbeat(ctx context.Context, request *schedulingpb.StoreHeartbeatRequest) (*schedulingpb.StoreHeartbeatResponse, error) {
c := s.GetCluster()
if c == nil {
// TODO: add metrics
log.Info("cluster isn't initialized")
return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil
}

if c.GetStore(request.GetStats().GetStoreId()) == nil {
s.metaWatcher.GetStoreWatcher().ForceLoad()
}

// TODO: add metrics
if err := c.HandleStoreHeartbeat(request); err != nil {
log.Error("handle store heartbeat failed", zap.Error(err))
}
return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil
}

// RegisterGRPCService registers the service to gRPC server.
func (s *Service) RegisterGRPCService(g *grpc.Server) {
schedulingpb.RegisterSchedulingServer(g, s)
}

// RegisterRESTHandler registers the service to REST server.
Expand Down
5 changes: 5 additions & 0 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,8 @@ func (w *Watcher) Close() {
w.cancel()
w.wg.Wait()
}

// GetStoreWatcher returns the store watcher.
func (w *Watcher) GetStoreWatcher() *etcdutil.LoopWatcher {
return w.storeWatcher
}
53 changes: 52 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
Expand Down Expand Up @@ -76,9 +77,29 @@ var (
// GrpcServer wraps Server to provide grpc service.
type GrpcServer struct {
*Server
schedulingClient atomic.Value
concurrentTSOProxyStreamings atomic.Int32
}

type schedulingClient struct {
client schedulingpb.SchedulingClient
lastPrimary string
}

func (s *schedulingClient) getClient() schedulingpb.SchedulingClient {
if s == nil {
return nil
}
return s.client
}

func (s *schedulingClient) getPrimaryAddr() string {
if s == nil {
return ""
}
return s.lastPrimary
}

type request interface {
GetHeader() *pdpb.RequestHeader
}
Expand Down Expand Up @@ -978,8 +999,23 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
}

s.handleDamagedStore(request.GetStats())

storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds())
if s.IsAPIServiceMode() {
s.updateSchedulingClient(ctx)
if s.schedulingClient.Load() != nil {
req := &schedulingpb.StoreHeartbeatRequest{
Header: &schedulingpb.RequestHeader{
ClusterId: request.GetHeader().GetClusterId(),
SenderId: request.GetHeader().GetSenderId(),
},
Stats: request.GetStats(),
}
if _, err := s.schedulingClient.Load().(*schedulingClient).getClient().StoreHeartbeat(ctx, req); err != nil {
// reset to let it be updated in the next request
s.schedulingClient.Store(&schedulingClient{})
}
}
}
}

if status := request.GetDrAutosyncStatus(); status != nil {
Expand All @@ -993,6 +1029,21 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
return resp, nil
}

func (s *GrpcServer) updateSchedulingClient(ctx context.Context) {
forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName)
pre := s.schedulingClient.Load()
if forwardedHost != "" && ((pre == nil) || (pre != nil && forwardedHost != pre.(*schedulingClient).getPrimaryAddr())) {
client, err := s.getDelegateClient(ctx, forwardedHost)
if err != nil {
log.Error("get delegate client failed", zap.Error(err))
}
s.schedulingClient.Store(&schedulingClient{
client: schedulingpb.NewSchedulingClient(client),
lastPrimary: forwardedHost,
})
}
}

// bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error
// occurs on SendAndClose() or Recv(), both endpoints will be closed.
type bucketHeartbeatServer struct {
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0
require (
github.com/docker/go-units v0.4.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974
github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/stretchr/testify v1.8.2
github.com/tikv/pd v0.0.0-00010101000000-000000000000
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 h1:Gn8rf2Mb3QDifUQHdtcopqKclc9L11hjhZFYBE65lcw=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM=
github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0

require (
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974
github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/stretchr/testify v1.8.2
github.com/tikv/pd v0.0.0-00010101000000-000000000000
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/mcs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 h1:Gn8rf2Mb3QDifUQHdtcopqKclc9L11hjhZFYBE65lcw=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM=
github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
Loading

0 comments on commit 1a0c8fb

Please sign in to comment.