From 643c2c370b9a711197ce62b3ed7e18a60e8afd0a Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 20 Sep 2023 13:58:43 +0800 Subject: [PATCH] mcs: forward region requests to scheduling server (#7023) ref tikv/pd#5839 Signed-off-by: Ryan Leung --- pkg/cluster/cluster.go | 65 ++++++++++ pkg/core/region.go | 62 ++++++--- pkg/keyspace/keyspace.go | 2 +- pkg/keyspace/util.go | 4 +- pkg/keyspace/util_test.go | 2 +- pkg/mcs/scheduling/server/cluster.go | 78 ++++++++++- pkg/mcs/scheduling/server/config/config.go | 1 - pkg/mcs/scheduling/server/grpc_service.go | 104 +++++++++++++++ pkg/mcs/scheduling/server/server.go | 2 +- pkg/mock/mockhbstream/mockhbstream.go | 8 +- pkg/mock/mockhbstream/mockhbstream_test.go | 20 +-- pkg/schedule/hbstream/heartbeat_streams.go | 82 +++++++++--- pkg/schedule/operator/step.go | 51 ++++---- server/api/stats.go | 2 +- server/cluster/cluster.go | 46 +++---- server/cluster/cluster_test.go | 8 +- server/grpc_service.go | 97 +++++++++++++- server/server.go | 2 +- tests/integrations/mcs/go.mod | 2 +- .../mcs/scheduling/config_test.go | 3 +- .../integrations/mcs/scheduling/rule_test.go | 122 +++++++----------- .../mcs/scheduling/server_test.go | 79 ++++++++++++ tests/pdctl/scheduler/scheduler_test.go | 32 ++++- tests/server/api/api_test.go | 5 + 24 files changed, 673 insertions(+), 206 deletions(-) create mode 100644 pkg/cluster/cluster.go diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go new file mode 100644 index 00000000000..0b3e0351b16 --- /dev/null +++ b/pkg/cluster/cluster.go @@ -0,0 +1,65 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/statistics" +) + +// Cluster provides an overview of a cluster's basic information. +type Cluster interface { + GetHotStat() *statistics.HotStat + GetRegionStats() *statistics.RegionStatistics + GetLabelStats() *statistics.LabelStatistics + GetCoordinator() *schedule.Coordinator + GetRuleManager() *placement.RuleManager +} + +// HandleStatsAsync handles the flow asynchronously. +func HandleStatsAsync(c Cluster, region *core.RegionInfo) { + c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) + c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + for _, peer := range region.GetPeers() { + peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) + c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) +} + +// HandleOverlaps handles the overlap regions. +func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) { + for _, item := range overlaps { + if c.GetRegionStats() != nil { + c.GetRegionStats().ClearDefunctRegion(item.GetID()) + } + c.GetLabelStats().ClearDefunctRegion(item.GetID()) + c.GetRuleManager().InvalidCache(item.GetID()) + } +} + +// Collect collects the cluster information. +func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats, isNew, isPrepared bool) { + if hasRegionStats { + c.GetRegionStats().Observe(region, stores) + } + if !isPrepared && isNew { + c.GetCoordinator().GetPrepareChecker().Collect(region) + } +} diff --git a/pkg/core/region.go b/pkg/core/region.go index 450fab499e6..f90c0c58cd6 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -143,8 +143,31 @@ const ( InitClusterRegionThreshold = 100 ) +// RegionHeartbeatResponse is the interface for region heartbeat response. +type RegionHeartbeatResponse interface { + GetTargetPeer() *metapb.Peer + GetRegionId() uint64 +} + +// RegionHeartbeatRequest is the interface for region heartbeat request. +type RegionHeartbeatRequest interface { + GetTerm() uint64 + GetRegion() *metapb.Region + GetLeader() *metapb.Peer + GetDownPeers() []*pdpb.PeerStats + GetPendingPeers() []*metapb.Peer + GetBytesWritten() uint64 + GetKeysWritten() uint64 + GetBytesRead() uint64 + GetKeysRead() uint64 + GetInterval() *pdpb.TimeInterval + GetQueryStats() *pdpb.QueryStats + GetApproximateSize() uint64 + GetApproximateKeys() uint64 +} + // RegionFromHeartbeat constructs a Region from region heartbeat. -func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo { +func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo { // Convert unit to MB. // If region isn't empty and less than 1MB, use 1MB instead. // The size of empty region will be correct by the previous RegionInfo. @@ -152,25 +175,28 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize { regionSize = EmptyRegionApproximateSize } - regionKvSize := heartbeat.GetApproximateKvSize() / units.MiB region := &RegionInfo{ - term: heartbeat.GetTerm(), - meta: heartbeat.GetRegion(), - leader: heartbeat.GetLeader(), - downPeers: heartbeat.GetDownPeers(), - pendingPeers: heartbeat.GetPendingPeers(), - cpuUsage: heartbeat.GetCpuUsage(), - writtenBytes: heartbeat.GetBytesWritten(), - writtenKeys: heartbeat.GetKeysWritten(), - readBytes: heartbeat.GetBytesRead(), - readKeys: heartbeat.GetKeysRead(), - approximateSize: int64(regionSize), - approximateKvSize: int64(regionKvSize), - approximateKeys: int64(heartbeat.GetApproximateKeys()), - interval: heartbeat.GetInterval(), - replicationStatus: heartbeat.GetReplicationStatus(), - queryStats: heartbeat.GetQueryStats(), + term: heartbeat.GetTerm(), + meta: heartbeat.GetRegion(), + leader: heartbeat.GetLeader(), + downPeers: heartbeat.GetDownPeers(), + pendingPeers: heartbeat.GetPendingPeers(), + writtenBytes: heartbeat.GetBytesWritten(), + writtenKeys: heartbeat.GetKeysWritten(), + readBytes: heartbeat.GetBytesRead(), + readKeys: heartbeat.GetKeysRead(), + approximateSize: int64(regionSize), + approximateKeys: int64(heartbeat.GetApproximateKeys()), + interval: heartbeat.GetInterval(), + queryStats: heartbeat.GetQueryStats(), + } + + // scheduling service doesn't need the following fields. + if h, ok := heartbeat.(*pdpb.RegionHeartbeatRequest); ok { + region.approximateKvSize = int64(h.GetApproximateKvSize() / units.MiB) + region.replicationStatus = h.GetReplicationStatus() + region.cpuUsage = h.GetCpuUsage() } for _, opt := range opts { diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index c46350f1e54..85e2bf9ba59 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -358,7 +358,7 @@ func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (er start := time.Now() skipRaw := manager.config.GetDisableRawKVRegionSplit() - keyspaceRule := MakeLabelRule(id, skipRaw) + keyspaceRule := makeLabelRule(id, skipRaw) cl, ok := manager.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }) if !ok { return errors.New("cluster does not support region label") diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index 640ca561b40..e580412cb16 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -203,8 +203,8 @@ func getRegionLabelID(id uint32) string { return regionLabelIDPrefix + strconv.FormatUint(uint64(id), endpoint.SpaceIDBase) } -// MakeLabelRule makes the label rule for the given keyspace id. -func MakeLabelRule(id uint32, skipRaw bool) *labeler.LabelRule { +// makeLabelRule makes the label rule for the given keyspace id. +func makeLabelRule(id uint32, skipRaw bool) *labeler.LabelRule { return &labeler.LabelRule{ ID: getRegionLabelID(id), Index: 0, diff --git a/pkg/keyspace/util_test.go b/pkg/keyspace/util_test.go index 346a04f6fc7..4656e158dfc 100644 --- a/pkg/keyspace/util_test.go +++ b/pkg/keyspace/util_test.go @@ -166,6 +166,6 @@ func TestMakeLabelRule(t *testing.T) { }, } for _, testCase := range testCases { - re.Equal(testCase.expectedLabelRule, MakeLabelRule(testCase.id, testCase.skipRaw)) + re.Equal(testCase.expectedLabelRule, makeLabelRule(testCase.id, testCase.skipRaw)) } } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 6660e94ccc5..d1d031aa636 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" @@ -17,6 +18,7 @@ import ( sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/slice" @@ -38,7 +40,7 @@ type Cluster struct { ruleManager *placement.RuleManager labelerManager *labeler.RegionLabeler regionStats *statistics.RegionStatistics - labelLevelStats *statistics.LabelStatistics + labelStats *statistics.LabelStatistics hotStat *statistics.HotStat storage storage.Storage coordinator *schedule.Coordinator @@ -66,8 +68,8 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, labelerManager: labelerManager, persistConfig: persistConfig, hotStat: statistics.NewHotStat(ctx), + labelStats: statistics.NewLabelStatistics(), regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), - labelLevelStats: statistics.NewLabelStatistics(), storage: storage, clusterID: clusterID, checkMembershipCh: checkMembershipCh, @@ -86,6 +88,21 @@ func (c *Cluster) GetCoordinator() *schedule.Coordinator { return c.coordinator } +// GetHotStat gets hot stat. +func (c *Cluster) GetHotStat() *statistics.HotStat { + return c.hotStat +} + +// GetRegionStats gets region statistics. +func (c *Cluster) GetRegionStats() *statistics.RegionStatistics { + return c.regionStats +} + +// GetLabelStats gets label statistics. +func (c *Cluster) GetLabelStats() *statistics.LabelStatistics { + return c.labelStats +} + // GetBasicCluster returns the basic cluster. func (c *Cluster) GetBasicCluster() *core.BasicCluster { return c.BasicCluster @@ -287,7 +304,7 @@ func (c *Cluster) waitSchedulersInitialized() { // UpdateRegionsLabelLevelStats updates the status of the region label level by types. func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { for _, region := range regions { - c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) + c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) } } @@ -389,3 +406,58 @@ func (c *Cluster) StopBackgroundJobs() { c.cancel() c.wg.Wait() } + +// HandleRegionHeartbeat processes RegionInfo reports from client. +func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { + if err := c.processRegionHeartbeat(region); err != nil { + return err + } + + c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) + return nil +} + +// processRegionHeartbeat updates the region information. +func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { + origin, _, err := c.PreCheckPutRegion(region) + if err != nil { + return err + } + + cluster.HandleStatsAsync(c, region) + + hasRegionStats := c.regionStats != nil + // Save to storage if meta is updated, except for flashback. + // Save to cache if meta or leader is updated, or contains any down/pending peer. + // Mark isNew if the region in cache does not have leader. + isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin) + if !saveCache && !isNew { + // Due to some config changes need to update the region stats as well, + // so we do some extra checks here. + if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { + c.regionStats.Observe(region, c.GetRegionStores(region)) + } + return nil + } + + var overlaps []*core.RegionInfo + if saveCache { + // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, + // check its validation again here. + // + // However it can't solve the race condition of concurrent heartbeats from the same region. + if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil { + return err + } + + cluster.HandleOverlaps(c, overlaps) + } + + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) + return nil +} + +// IsPrepared return true if the prepare checker is ready. +func (c *Cluster) IsPrepared() bool { + return c.coordinator.GetPrepareChecker().IsPrepared() +} diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 82c15632b3d..e1d680069ce 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -110,7 +110,6 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error { configutil.AdjustCommandLineString(flagSet, &c.BackendEndpoints, "backend-endpoints") configutil.AdjustCommandLineString(flagSet, &c.ListenAddr, "listen-addr") configutil.AdjustCommandLineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr") - return c.adjust(meta) } diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index f615e0c37c0..4558688822a 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -16,13 +16,19 @@ package server import ( "context" + "io" "net/http" + "sync/atomic" + "time" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -67,6 +73,104 @@ func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService { } } +// heartbeatServer wraps Scheduling_RegionHeartbeatServer to ensure when any error +// occurs on Send() or Recv(), both endpoints will be closed. +type heartbeatServer struct { + stream schedulingpb.Scheduling_RegionHeartbeatServer + closed int32 +} + +func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { + if atomic.LoadInt32(&s.closed) == 1 { + return io.EOF + } + done := make(chan error, 1) + go func() { + defer logutil.LogPanic() + done <- s.stream.Send(m.(*schedulingpb.RegionHeartbeatResponse)) + }() + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { + case err := <-done: + if err != nil { + atomic.StoreInt32(&s.closed, 1) + } + return errors.WithStack(err) + case <-timer.C: + atomic.StoreInt32(&s.closed, 1) + return status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") + } +} + +func (s *heartbeatServer) Recv() (*schedulingpb.RegionHeartbeatRequest, error) { + if atomic.LoadInt32(&s.closed) == 1 { + return nil, io.EOF + } + req, err := s.stream.Recv() + if err != nil { + atomic.StoreInt32(&s.closed, 1) + return nil, errors.WithStack(err) + } + return req, nil +} + +// RegionHeartbeat implements gRPC PDServer. +func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeatServer) error { + var ( + server = &heartbeatServer{stream: stream} + cancel context.CancelFunc + lastBind time.Time + ) + defer func() { + // cancel the forward stream + if cancel != nil { + cancel() + } + }() + + for { + request, err := server.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return errors.WithStack(err) + } + + c := s.GetCluster() + if c == nil { + resp := &schedulingpb.RegionHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ + ClusterId: s.clusterID, + Error: &schedulingpb.Error{ + Type: schedulingpb.ErrorType_NOT_BOOTSTRAPPED, + Message: "scheduling server is not initialized yet", + }, + }} + err := server.Send(resp) + return errors.WithStack(err) + } + + storeID := request.GetLeader().GetStoreId() + store := c.GetStore(storeID) + if store == nil { + return errors.Errorf("invalid store ID %d, not found", storeID) + } + + if time.Since(lastBind) > time.Minute { + s.hbStreams.BindStream(storeID, server) + lastBind = time.Now() + } + region := core.RegionFromHeartbeat(request, core.SetFromHeartbeat(true)) + err = c.HandleRegionHeartbeat(region) + if err != nil { + // TODO: if we need to send the error back to API server. + log.Error("failed handle region heartbeat", zap.Error(err)) + continue + } + } +} + // StoreHeartbeat implements gRPC PDServer. func (s *Service) StoreHeartbeat(ctx context.Context, request *schedulingpb.StoreHeartbeatRequest) (*schedulingpb.StoreHeartbeatResponse, error) { c := s.GetCluster() diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index f4c5c676dd3..fd7621bf2cb 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -437,7 +437,7 @@ func (s *Server) startCluster(context.Context) error { if err != nil { return err } - s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster) + s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, utils.SchedulingServiceName, s.basicCluster) s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh) if err != nil { return err diff --git a/pkg/mock/mockhbstream/mockhbstream.go b/pkg/mock/mockhbstream/mockhbstream.go index c94042bf102..289f31d63dd 100644 --- a/pkg/mock/mockhbstream/mockhbstream.go +++ b/pkg/mock/mockhbstream/mockhbstream.go @@ -25,18 +25,18 @@ import ( // HeartbeatStream is used to mock HeartbeatStream for test use. type HeartbeatStream struct { - ch chan *pdpb.RegionHeartbeatResponse + ch chan core.RegionHeartbeatResponse } // NewHeartbeatStream creates a new HeartbeatStream. func NewHeartbeatStream() HeartbeatStream { return HeartbeatStream{ - ch: make(chan *pdpb.RegionHeartbeatResponse), + ch: make(chan core.RegionHeartbeatResponse), } } // Send mocks method. -func (s HeartbeatStream) Send(m *pdpb.RegionHeartbeatResponse) error { +func (s HeartbeatStream) Send(m core.RegionHeartbeatResponse) error { select { case <-time.After(time.Second): return errors.New("timeout") @@ -52,7 +52,7 @@ func (s HeartbeatStream) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartb func (s HeartbeatStream) BindStream(storeID uint64, stream hbstream.HeartbeatStream) {} // Recv mocks method. -func (s HeartbeatStream) Recv() *pdpb.RegionHeartbeatResponse { +func (s HeartbeatStream) Recv() core.RegionHeartbeatResponse { select { case <-time.After(time.Millisecond * 10): return nil diff --git a/pkg/mock/mockhbstream/mockhbstream_test.go b/pkg/mock/mockhbstream/mockhbstream_test.go index 46af7df534b..a8e88f61aee 100644 --- a/pkg/mock/mockhbstream/mockhbstream_test.go +++ b/pkg/mock/mockhbstream/mockhbstream_test.go @@ -22,12 +22,10 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/pkg/mock/mockconfig" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/utils/testutil" - "github.com/tikv/pd/pkg/utils/typeutil" ) func TestActivity(t *testing.T) { @@ -41,37 +39,33 @@ func TestActivity(t *testing.T) { cluster.AddRegionStore(2, 0) cluster.AddLeaderRegion(1, 1) region := cluster.GetRegion(1) - msg := &pdpb.RegionHeartbeatResponse{ - ChangePeer: &pdpb.ChangePeer{Peer: &metapb.Peer{Id: 2, StoreId: 2}, ChangeType: eraftpb.ConfChangeType_AddLearnerNode}, - } - hbs := hbstream.NewTestHeartbeatStreams(ctx, cluster.ID, cluster, true) stream1, stream2 := NewHeartbeatStream(), NewHeartbeatStream() // Active stream is stream1. hbs.BindStream(1, stream1) testutil.Eventually(re, func() bool { - newMsg := typeutil.DeepClone(msg, core.RegionHeartbeatResponseFactory) - hbs.SendMsg(region, newMsg) + msg := &hbstream.Operation{ChangePeer: &pdpb.ChangePeer{Peer: &metapb.Peer{Id: 2, StoreId: 2}, ChangeType: eraftpb.ConfChangeType_AddLearnerNode}} + hbs.SendMsg(region, msg) return stream1.Recv() != nil && stream2.Recv() == nil }) // Rebind to stream2. hbs.BindStream(1, stream2) testutil.Eventually(re, func() bool { - newMsg := typeutil.DeepClone(msg, core.RegionHeartbeatResponseFactory) - hbs.SendMsg(region, newMsg) + msg := &hbstream.Operation{ChangePeer: &pdpb.ChangePeer{Peer: &metapb.Peer{Id: 2, StoreId: 2}, ChangeType: eraftpb.ConfChangeType_AddLearnerNode}} + hbs.SendMsg(region, msg) return stream1.Recv() == nil && stream2.Recv() != nil }) // SendErr to stream2. hbs.SendErr(pdpb.ErrorType_UNKNOWN, "test error", &metapb.Peer{Id: 1, StoreId: 1}) res := stream2.Recv() re.NotNil(res) - re.NotNil(res.GetHeader().GetError()) + re.NotNil(res.(*pdpb.RegionHeartbeatResponse).GetHeader().GetError()) // Switch back to 1 again. hbs.BindStream(1, stream1) testutil.Eventually(re, func() bool { - newMsg := typeutil.DeepClone(msg, core.RegionHeartbeatResponseFactory) - hbs.SendMsg(region, newMsg) + msg := &hbstream.Operation{ChangePeer: &pdpb.ChangePeer{Peer: &metapb.Peer{Id: 2, StoreId: 2}, ChangeType: eraftpb.ConfChangeType_AddLearnerNode}} + hbs.SendMsg(region, msg) return stream1.Recv() != nil && stream2.Recv() == nil }) } diff --git a/pkg/schedule/hbstream/heartbeat_streams.go b/pkg/schedule/hbstream/heartbeat_streams.go index d80b6ff3a46..e7d7f688035 100644 --- a/pkg/schedule/hbstream/heartbeat_streams.go +++ b/pkg/schedule/hbstream/heartbeat_streams.go @@ -23,16 +23,30 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" ) +// Operation is detailed scheduling step of a region. +type Operation struct { + ChangePeer *pdpb.ChangePeer + // Pd can return transfer_leader to let TiKV does leader transfer itself. + TransferLeader *pdpb.TransferLeader + Merge *pdpb.Merge + // PD sends split_region to let TiKV split a region into two regions. + SplitRegion *pdpb.SplitRegion + ChangePeerV2 *pdpb.ChangePeerV2 + SwitchWitnesses *pdpb.BatchSwitchWitness +} + // HeartbeatStream is an interface. type HeartbeatStream interface { - Send(*pdpb.RegionHeartbeatResponse) error + Send(core.RegionHeartbeatResponse) error } const ( @@ -52,33 +66,35 @@ type HeartbeatStreams struct { hbStreamCancel context.CancelFunc clusterID uint64 streams map[uint64]HeartbeatStream - msgCh chan *pdpb.RegionHeartbeatResponse + msgCh chan core.RegionHeartbeatResponse streamCh chan streamUpdate storeInformer core.StoreSetInformer + typ string needRun bool // For test only. } // NewHeartbeatStreams creates a new HeartbeatStreams which enable background running by default. -func NewHeartbeatStreams(ctx context.Context, clusterID uint64, storeInformer core.StoreSetInformer) *HeartbeatStreams { - return newHbStreams(ctx, clusterID, storeInformer, true) +func NewHeartbeatStreams(ctx context.Context, clusterID uint64, typ string, storeInformer core.StoreSetInformer) *HeartbeatStreams { + return newHbStreams(ctx, clusterID, typ, storeInformer, true) } // NewTestHeartbeatStreams creates a new HeartbeatStreams for test purpose only. // Please use NewHeartbeatStreams for other usage. func NewTestHeartbeatStreams(ctx context.Context, clusterID uint64, storeInformer core.StoreSetInformer, needRun bool) *HeartbeatStreams { - return newHbStreams(ctx, clusterID, storeInformer, needRun) + return newHbStreams(ctx, clusterID, "", storeInformer, needRun) } -func newHbStreams(ctx context.Context, clusterID uint64, storeInformer core.StoreSetInformer, needRun bool) *HeartbeatStreams { +func newHbStreams(ctx context.Context, clusterID uint64, typ string, storeInformer core.StoreSetInformer, needRun bool) *HeartbeatStreams { hbStreamCtx, hbStreamCancel := context.WithCancel(ctx) hs := &HeartbeatStreams{ hbStreamCtx: hbStreamCtx, hbStreamCancel: hbStreamCancel, clusterID: clusterID, streams: make(map[uint64]HeartbeatStream), - msgCh: make(chan *pdpb.RegionHeartbeatResponse, heartbeatChanCapacity), + msgCh: make(chan core.RegionHeartbeatResponse, heartbeatChanCapacity), streamCh: make(chan streamUpdate, 1), storeInformer: storeInformer, + typ: typ, needRun: needRun, } if needRun { @@ -96,7 +112,13 @@ func (s *HeartbeatStreams) run() { keepAliveTicker := time.NewTicker(heartbeatStreamKeepAliveInterval) defer keepAliveTicker.Stop() - keepAlive := &pdpb.RegionHeartbeatResponse{Header: &pdpb.ResponseHeader{ClusterId: s.clusterID}} + var keepAlive core.RegionHeartbeatResponse + switch s.typ { + case utils.SchedulingServiceName: + keepAlive = &schedulingpb.RegionHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}} + default: + keepAlive = &pdpb.RegionHeartbeatResponse{Header: &pdpb.ResponseHeader{ClusterId: s.clusterID}} + } for { select { @@ -108,7 +130,7 @@ func (s *HeartbeatStreams) run() { store := s.storeInformer.GetStore(storeID) if store == nil { log.Error("failed to get store", - zap.Uint64("region-id", msg.RegionId), + zap.Uint64("region-id", msg.GetRegionId()), zap.Uint64("store-id", storeID), errs.ZapError(errs.ErrGetSourceStore)) delete(s.streams, storeID) continue @@ -117,7 +139,7 @@ func (s *HeartbeatStreams) run() { if stream, ok := s.streams[storeID]; ok { if err := stream.Send(msg); err != nil { log.Error("send heartbeat message fail", - zap.Uint64("region-id", msg.RegionId), errs.ZapError(errs.ErrGRPCSend.Wrap(err).GenWithStackByArgs())) + zap.Uint64("region-id", msg.GetRegionId()), errs.ZapError(errs.ErrGRPCSend.Wrap(err).GenWithStackByArgs())) delete(s.streams, storeID) heartbeatStreamCounter.WithLabelValues(storeAddress, storeLabel, "push", "err").Inc() } else { @@ -125,7 +147,7 @@ func (s *HeartbeatStreams) run() { } } else { log.Debug("heartbeat stream not found, skip send message", - zap.Uint64("region-id", msg.RegionId), + zap.Uint64("region-id", msg.GetRegionId()), zap.Uint64("store-id", storeID)) heartbeatStreamCounter.WithLabelValues(storeAddress, storeLabel, "push", "skip").Inc() } @@ -174,18 +196,44 @@ func (s *HeartbeatStreams) BindStream(storeID uint64, stream HeartbeatStream) { } // SendMsg sends a message to related store. -func (s *HeartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) { +func (s *HeartbeatStreams) SendMsg(region *core.RegionInfo, op *Operation) { if region.GetLeader() == nil { return } - msg.Header = &pdpb.ResponseHeader{ClusterId: s.clusterID} - msg.RegionId = region.GetID() - msg.RegionEpoch = region.GetRegionEpoch() - msg.TargetPeer = region.GetLeader() + // TODO: use generic + var resp core.RegionHeartbeatResponse + switch s.typ { + case utils.SchedulingServiceName: + resp = &schedulingpb.RegionHeartbeatResponse{ + Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}, + RegionId: region.GetID(), + RegionEpoch: region.GetRegionEpoch(), + TargetPeer: region.GetLeader(), + ChangePeer: op.ChangePeer, + TransferLeader: op.TransferLeader, + Merge: op.Merge, + SplitRegion: op.SplitRegion, + ChangePeerV2: op.ChangePeerV2, + SwitchWitnesses: op.SwitchWitnesses, + } + default: + resp = &pdpb.RegionHeartbeatResponse{ + Header: &pdpb.ResponseHeader{ClusterId: s.clusterID}, + RegionId: region.GetID(), + RegionEpoch: region.GetRegionEpoch(), + TargetPeer: region.GetLeader(), + ChangePeer: op.ChangePeer, + TransferLeader: op.TransferLeader, + Merge: op.Merge, + SplitRegion: op.SplitRegion, + ChangePeerV2: op.ChangePeerV2, + SwitchWitnesses: op.SwitchWitnesses, + } + } select { - case s.msgCh <- msg: + case s.msgCh <- resp: case <-s.hbStreamCtx.Done(): } } diff --git a/pkg/schedule/operator/step.go b/pkg/schedule/operator/step.go index 0a09ab92773..089b166d11f 100644 --- a/pkg/schedule/operator/step.go +++ b/pkg/schedule/operator/step.go @@ -28,6 +28,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" ) @@ -57,7 +58,7 @@ type OpStep interface { CheckInProgress(ci *core.BasicCluster, config config.SharedConfigProvider, region *core.RegionInfo) error Influence(opInfluence OpInfluence, region *core.RegionInfo) Timeout(regionSize int64) time.Duration - GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse + GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation } // TransferLeader is an OpStep that transfers a region's leader. @@ -126,12 +127,12 @@ func (tl TransferLeader) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (tl TransferLeader) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (tl TransferLeader) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { peers := make([]*metapb.Peer, 0, len(tl.ToStores)) for _, storeID := range tl.ToStores { peers = append(peers, region.GetStorePeer(storeID)) } - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ TransferLeader: &pdpb.TransferLeader{ Peer: region.GetStorePeer(tl.ToStore), Peers: peers, @@ -210,7 +211,7 @@ func (ap AddPeer) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (ap AddPeer) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (ap AddPeer) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { peer := region.GetStorePeer(ap.ToStore) if peer != nil { // The newly added peer is pending. @@ -274,7 +275,7 @@ func (bw BecomeWitness) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (bw BecomeWitness) GetCmd(_ *core.RegionInfo, _ bool) *pdpb.RegionHeartbeatResponse { +func (bw BecomeWitness) GetCmd(_ *core.RegionInfo, _ bool) *hbstream.Operation { return switchWitness(bw.PeerID, true) } @@ -342,7 +343,7 @@ func (bn BecomeNonWitness) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (bn BecomeNonWitness) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (bn BecomeNonWitness) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { return switchWitness(bn.PeerID, false) } @@ -426,7 +427,7 @@ func (bsw BatchSwitchWitness) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (bsw BatchSwitchWitness) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (bsw BatchSwitchWitness) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { switches := make([]*pdpb.SwitchWitness, 0, len(bsw.ToWitnesses)+len(bsw.ToNonWitnesses)) for _, w := range bsw.ToWitnesses { switches = append(switches, w.GetCmd(region, useConfChangeV2).SwitchWitnesses.SwitchWitnesses...) @@ -434,7 +435,7 @@ func (bsw BatchSwitchWitness) GetCmd(region *core.RegionInfo, useConfChangeV2 bo for _, nw := range bsw.ToNonWitnesses { switches = append(switches, nw.GetCmd(region, useConfChangeV2).SwitchWitnesses.SwitchWitnesses...) } - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ SwitchWitnesses: &pdpb.BatchSwitchWitness{ SwitchWitnesses: switches, }, @@ -522,7 +523,7 @@ func (al AddLearner) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (al AddLearner) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (al AddLearner) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { if region.GetStorePeer(al.ToStore) != nil { // The newly added peer is pending. return nil @@ -581,7 +582,7 @@ func (pl PromoteLearner) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (pl PromoteLearner) GetCmd(_ *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (pl PromoteLearner) GetCmd(_ *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { return createResponse(addNode(pl.PeerID, pl.ToStore, pl.IsWitness), useConfChangeV2) } @@ -647,7 +648,7 @@ func (rp RemovePeer) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (rp RemovePeer) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (rp RemovePeer) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { return createResponse(&pdpb.ChangePeer{ ChangeType: eraftpb.ConfChangeType_RemoveNode, Peer: region.GetStorePeer(rp.FromStore), @@ -709,11 +710,11 @@ func (mr MergeRegion) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (mr MergeRegion) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (mr MergeRegion) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { if mr.IsPassive { return nil } - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ Merge: &pdpb.Merge{ Target: mr.ToRegion, }, @@ -763,8 +764,8 @@ func (sr SplitRegion) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (sr SplitRegion) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { - return &pdpb.RegionHeartbeatResponse{ +func (sr SplitRegion) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { + return &hbstream.Operation{ SplitRegion: &pdpb.SplitRegion{ Policy: sr.Policy, Keys: sr.SplitKeys, @@ -813,7 +814,7 @@ func (dv DemoteVoter) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (dv DemoteVoter) GetCmd(_ *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (dv DemoteVoter) GetCmd(_ *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { return createResponse(addLearnerNode(dv.PeerID, dv.ToStore, dv.IsWitness), useConfChangeV2) } @@ -935,7 +936,7 @@ func (cpe ChangePeerV2Enter) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (cpe ChangePeerV2Enter) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (cpe ChangePeerV2Enter) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { if !useConfChangeV2 { // only supported in ChangePeerV2 return nil @@ -947,7 +948,7 @@ func (cpe ChangePeerV2Enter) GetCmd(region *core.RegionInfo, useConfChangeV2 boo for _, dv := range cpe.DemoteVoters { changes = append(changes, dv.GetCmd(region, useConfChangeV2).ChangePeerV2.Changes...) } - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ ChangePeerV2: &pdpb.ChangePeerV2{ Changes: changes, }, @@ -1075,12 +1076,12 @@ func (cpl ChangePeerV2Leave) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (cpl ChangePeerV2Leave) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (cpl ChangePeerV2Leave) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { if !useConfChangeV2 { // only supported in ChangePeerV2 return nil } - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ ChangePeerV2: &pdpb.ChangePeerV2{}, } } @@ -1138,21 +1139,21 @@ func addLearnerNode(id, storeID uint64, isWitness bool) *pdpb.ChangePeer { } } -func createResponse(change *pdpb.ChangePeer, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func createResponse(change *pdpb.ChangePeer, useConfChangeV2 bool) *hbstream.Operation { if useConfChangeV2 { - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ ChangePeerV2: &pdpb.ChangePeerV2{ Changes: []*pdpb.ChangePeer{change}, }, } } - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ ChangePeer: change, } } -func switchWitness(peerID uint64, isWitness bool) *pdpb.RegionHeartbeatResponse { - return &pdpb.RegionHeartbeatResponse{ +func switchWitness(peerID uint64, isWitness bool) *hbstream.Operation { + return &hbstream.Operation{ SwitchWitnesses: &pdpb.BatchSwitchWitness{ SwitchWitnesses: []*pdpb.SwitchWitness{{PeerId: peerID, IsWitness: isWitness}}, }, diff --git a/server/api/stats.go b/server/api/stats.go index e8b04ba588e..1798597b6cc 100644 --- a/server/api/stats.go +++ b/server/api/stats.go @@ -48,7 +48,7 @@ func (h *statsHandler) GetRegionStatus(w http.ResponseWriter, r *http.Request) { if r.URL.Query().Has("count") { stats = rc.GetRegionCount([]byte(startKey), []byte(endKey)) } else { - stats = rc.GetRegionStats([]byte(startKey), []byte(endKey)) + stats = rc.GetRegionStatsByRange([]byte(startKey), []byte(endKey)) } h.rd.JSON(w, http.StatusOK, stats) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 047ff1018f5..46f0094ee49 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" @@ -892,11 +893,21 @@ func (c *RaftCluster) GetSuspectRegions() []uint64 { return c.coordinator.GetCheckerController().GetSuspectRegions() } -// GetHotStat gets hot stat for test. +// GetHotStat gets hot stat. func (c *RaftCluster) GetHotStat() *statistics.HotStat { return c.hotStat } +// GetRegionStats gets region statistics. +func (c *RaftCluster) GetRegionStats() *statistics.RegionStatistics { + return c.regionStats +} + +// GetLabelStats gets label statistics. +func (c *RaftCluster) GetLabelStats() *statistics.LabelStatistics { + return c.labelLevelStats +} + // RemoveSuspectRegion removes region from suspect list. func (c *RaftCluster) RemoveSuspectRegion(id uint64) { c.coordinator.GetCheckerController().RemoveSuspectRegion(id) @@ -1095,15 +1106,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) if !c.isAPIServiceMode { - c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) - c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) - c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) - } - c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region) + cluster.HandleStatsAsync(c, region) } hasRegionStats := c.regionStats != nil @@ -1136,27 +1139,16 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { return err } - - for _, item := range overlaps { - if !c.isAPIServiceMode { - if c.regionStats != nil { - c.regionStats.ClearDefunctRegion(item.GetID()) - } - c.labelLevelStats.ClearDefunctRegion(item.GetID()) - } - c.ruleManager.InvalidCache(item.GetID()) + if !c.isAPIServiceMode { + cluster.HandleOverlaps(c, overlaps) } regionUpdateCacheEventCounter.Inc() } if !c.isAPIServiceMode { - if hasRegionStats { - c.regionStats.Observe(region, c.getRegionStoresLocked(region)) - } - } - if !c.IsPrepared() && isNew { - c.coordinator.GetPrepareChecker().Collect(region) + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) } + if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. @@ -2329,8 +2321,8 @@ func (c *RaftCluster) PutMetaCluster(meta *metapb.Cluster) error { return c.putMetaLocked(typeutil.DeepClone(meta, core.ClusterFactory)) } -// GetRegionStats returns region statistics from cluster. -func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.RegionStats { +// GetRegionStatsByRange returns region statistics from cluster. +func (c *RaftCluster) GetRegionStatsByRange(startKey, endKey []byte) *statistics.RegionStats { return statistics.GetRegionStats(c.core.ScanRegions(startKey, endKey, -1)) } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 75ff8c50207..98871ece96c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3582,7 +3582,7 @@ func TestInterval(t *testing.T) { func waitAddLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.Eventually(re, func() bool { - if res = stream.Recv(); res != nil { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { return res.GetRegionId() == region.GetID() && res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddLearnerNode && res.GetChangePeer().GetPeer().GetStoreId() == storeID @@ -3598,7 +3598,7 @@ func waitAddLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, func waitPromoteLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.Eventually(re, func() bool { - if res = stream.Recv(); res != nil { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { return res.GetRegionId() == region.GetID() && res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddNode && res.GetChangePeer().GetPeer().GetStoreId() == storeID @@ -3615,7 +3615,7 @@ func waitPromoteLearner(re *require.Assertions, stream mockhbstream.HeartbeatStr func waitRemovePeer(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.Eventually(re, func() bool { - if res = stream.Recv(); res != nil { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { return res.GetRegionId() == region.GetID() && res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_RemoveNode && res.GetChangePeer().GetPeer().GetStoreId() == storeID @@ -3631,7 +3631,7 @@ func waitRemovePeer(re *require.Assertions, stream mockhbstream.HeartbeatStream, func waitTransferLeader(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.Eventually(re, func() bool { - if res = stream.Recv(); res != nil { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { if res.GetRegionId() == region.GetID() { for _, peer := range append(res.GetTransferLeader().GetPeers(), res.GetTransferLeader().GetPeer()) { if peer.GetStoreId() == storeID { diff --git a/server/grpc_service.go b/server/grpc_service.go index f318061f89a..fa5fdbf9103 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1044,14 +1044,14 @@ type heartbeatServer struct { closed int32 } -func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error { +func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { if atomic.LoadInt32(&s.closed) == 1 { return io.EOF } done := make(chan error, 1) go func() { defer logutil.LogPanic() - done <- s.stream.Send(m) + done <- s.stream.Send(m.(*pdpb.RegionHeartbeatResponse)) }() timer := time.NewTimer(heartbeatSendTimeout) defer timer.Stop() @@ -1183,6 +1183,9 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error lastForwardedHost string lastBind time.Time errCh chan error + schedulingStream schedulingpb.Scheduling_RegionHeartbeatClient + cancel1 context.CancelFunc + lastPrimaryAddr string ) defer func() { // cancel the forward stream @@ -1296,6 +1299,55 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error s.hbStreams.SendErr(pdpb.ErrorType_UNKNOWN, msg, request.GetLeader()) continue } + + if s.IsAPIServiceMode() { + ctx := stream.Context() + primaryAddr, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) + if schedulingStream == nil || lastPrimaryAddr != primaryAddr { + if cancel1 != nil { + cancel1() + } + client, err := s.getDelegateClient(ctx, primaryAddr) + if err != nil { + log.Error("get delegate client failed", zap.Error(err)) + } + + log.Info("create region heartbeat forward stream", zap.String("forwarded-host", primaryAddr)) + schedulingStream, cancel1, err = s.createSchedulingStream(client) + if err != nil { + log.Error("create region heartbeat forward stream failed", zap.Error(err)) + } else { + lastPrimaryAddr = primaryAddr + errCh = make(chan error, 1) + go forwardSchedulingToServer(schedulingStream, server, errCh) + } + } + if schedulingStream != nil { + req := &schedulingpb.RegionHeartbeatRequest{ + Header: &schedulingpb.RequestHeader{ + ClusterId: request.GetHeader().GetClusterId(), + SenderId: request.GetHeader().GetSenderId(), + }, + Region: request.GetRegion(), + Leader: request.GetLeader(), + DownPeers: request.GetDownPeers(), + PendingPeers: request.GetPendingPeers(), + BytesWritten: request.GetBytesWritten(), + BytesRead: request.GetBytesRead(), + KeysWritten: request.GetKeysWritten(), + KeysRead: request.GetKeysRead(), + ApproximateSize: request.GetApproximateSize(), + ApproximateKeys: request.GetApproximateKeys(), + Interval: request.GetInterval(), + Term: request.GetTerm(), + QueryStats: request.GetQueryStats(), + } + if err := schedulingStream.Send(req); err != nil { + log.Error("forward region heartbeat failed", zap.Error(err)) + } + } + } + regionHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc() } @@ -2185,6 +2237,47 @@ func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatC } } +func (s *GrpcServer) createSchedulingStream(client *grpc.ClientConn) (schedulingpb.Scheduling_RegionHeartbeatClient, context.CancelFunc, error) { + done := make(chan struct{}) + ctx, cancel := context.WithCancel(s.ctx) + go grpcutil.CheckStream(ctx, cancel, done) + forwardStream, err := schedulingpb.NewSchedulingClient(client).RegionHeartbeat(ctx) + done <- struct{}{} + return forwardStream, cancel, err +} + +func forwardSchedulingToServer(forwardStream schedulingpb.Scheduling_RegionHeartbeatClient, server *heartbeatServer, errCh chan error) { + defer logutil.LogPanic() + defer close(errCh) + for { + resp, err := forwardStream.Recv() + if err != nil { + errCh <- errors.WithStack(err) + return + } + response := &pdpb.RegionHeartbeatResponse{ + Header: &pdpb.ResponseHeader{ + ClusterId: resp.GetHeader().GetClusterId(), + // ignore error here + }, + ChangePeer: resp.GetChangePeer(), + TransferLeader: resp.GetTransferLeader(), + RegionId: resp.GetRegionId(), + RegionEpoch: resp.GetRegionEpoch(), + TargetPeer: resp.GetTargetPeer(), + Merge: resp.GetMerge(), + SplitRegion: resp.GetSplitRegion(), + ChangePeerV2: resp.GetChangePeerV2(), + SwitchWitnesses: resp.GetSwitchWitnesses(), + } + + if err := server.Send(response); err != nil { + errCh <- errors.WithStack(err) + return + } + } +} + func (s *GrpcServer) createTSOForwardStream( ctx context.Context, client *grpc.ClientConn, ) (tsopb.TSO_TsoClient, context.Context, context.CancelFunc, error) { diff --git a/server/server.go b/server/server.go index 36919805d40..854350ebead 100644 --- a/server/server.go +++ b/server/server.go @@ -470,7 +470,7 @@ func (s *Server) startServer(ctx context.Context) error { } s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager) s.safePointV2Manager = gc.NewSafePointManagerV2(s.ctx, s.storage, s.storage, s.storage) - s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, s.cluster) + s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, "", s.cluster) // initial hot_region_storage in here. s.hotRegionStorage, err = storage.NewHotRegionsStorage( ctx, filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, s.handler) diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index a5f0de110e5..721a3f7b333 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -11,6 +11,7 @@ replace ( replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( + github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 github.com/pingcap/kvproto v0.0.0-20230925123611-87bebcc0d071 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 @@ -56,7 +57,6 @@ require ( github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/docker/go-units v0.4.0 // indirect github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 // indirect github.com/elastic/gosigar v0.14.2 // indirect github.com/elliotchance/pie/v2 v2.1.0 // indirect diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index 0cc3905b3a5..3585980e4fb 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -133,7 +133,6 @@ func persistConfig(re *require.Assertions, pdLeaderServer *tests.TestServer) { func (suite *configTestSuite) TestSchedulerConfigWatch() { re := suite.Require() - // Make sure the config is persisted before the watcher is created. persistConfig(re, suite.pdLeaderServer) // Create a config watcher. @@ -149,7 +148,7 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { // Get all default scheduler names. var namesFromAPIServer []string testutil.Eventually(re, func() bool { - namesFromAPIServer, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllScheduleConfig() + namesFromAPIServer, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllSchedulerConfigs() return len(namesFromAPIServer) == len(sc.DefaultSchedulers) }) // Check all default schedulers' configs. diff --git a/tests/integrations/mcs/scheduling/rule_test.go b/tests/integrations/mcs/scheduling/rule_test.go index 68347366378..2028214be73 100644 --- a/tests/integrations/mcs/scheduling/rule_test.go +++ b/tests/integrations/mcs/scheduling/rule_test.go @@ -19,14 +19,9 @@ import ( "sort" "testing" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/tikv/pd/pkg/keyspace" - "github.com/tikv/pd/pkg/mcs/scheduling/server/rule" - "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" ) @@ -40,7 +35,8 @@ type ruleTestSuite struct { // The PD cluster. cluster *tests.TestCluster // pdLeaderServer is the leader server of the PD cluster. - pdLeaderServer *tests.TestServer + pdLeaderServer *tests.TestServer + backendEndpoint string } func TestRule(t *testing.T) { @@ -58,6 +54,7 @@ func (suite *ruleTestSuite) SetupSuite() { re.NoError(err) leaderName := suite.cluster.WaitLeader() suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + suite.backendEndpoint = suite.pdLeaderServer.GetAddr() re.NoError(suite.pdLeaderServer.BootstrapCluster()) } @@ -66,49 +63,18 @@ func (suite *ruleTestSuite) TearDownSuite() { suite.cluster.Destroy() } -func loadRules(re *require.Assertions, ruleStorage endpoint.RuleStorage) (rules []*placement.Rule) { - err := ruleStorage.LoadRules(func(_, v string) { - r, err := placement.NewRuleFromJSON([]byte(v)) - re.NoError(err) - rules = append(rules, r) - }) - re.NoError(err) - return -} - -func loadRuleGroups(re *require.Assertions, ruleStorage endpoint.RuleStorage) (groups []*placement.RuleGroup) { - err := ruleStorage.LoadRuleGroups(func(_, v string) { - rg, err := placement.NewRuleGroupFromJSON([]byte(v)) - re.NoError(err) - groups = append(groups, rg) - }) - re.NoError(err) - return -} - -func loadRegionRules(re *require.Assertions, ruleStorage endpoint.RuleStorage) (rules []*labeler.LabelRule) { - err := ruleStorage.LoadRegionRules(func(_, v string) { - lr, err := labeler.NewLabelRuleFromJSON([]byte(v)) - re.NoError(err) - rules = append(rules, lr) - }) - re.NoError(err) - return -} - func (suite *ruleTestSuite) TestRuleWatch() { re := suite.Require() - // Create a rule watcher. - watcher, err := rule.NewWatcher( - suite.ctx, - suite.pdLeaderServer.GetEtcdClient(), - suite.cluster.GetCluster().GetId(), - ) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoint) re.NoError(err) - ruleStorage := watcher.GetRuleStorage() - // Check the default rule. - rules := loadRules(re, ruleStorage) + defer tc.Destroy() + + tc.WaitForPrimaryServing(re) + cluster := tc.GetPrimaryServer().GetCluster() + ruleManager := cluster.GetRuleManager() + // Check the default rule and rule group. + rules := ruleManager.GetAllRules() re.Len(rules, 1) re.Equal("pd", rules[0].GroupID) re.Equal("default", rules[0].ID) @@ -117,24 +83,25 @@ func (suite *ruleTestSuite) TestRuleWatch() { re.Empty(rules[0].EndKey) re.Equal(placement.Voter, rules[0].Role) re.Empty(rules[0].LocationLabels) - // Check the empty rule group. - ruleGroups := loadRuleGroups(re, ruleStorage) - re.NoError(err) - re.Empty(ruleGroups) + ruleGroups := ruleManager.GetRuleGroups() + re.Len(ruleGroups, 1) + re.Equal("pd", ruleGroups[0].ID) + re.Equal(0, ruleGroups[0].Index) + re.False(ruleGroups[0].Override) // Set a new rule via the PD API server. - ruleManager := suite.pdLeaderServer.GetRaftCluster().GetRuleManager() + apiRuleManager := suite.pdLeaderServer.GetRaftCluster().GetRuleManager() rule := &placement.Rule{ GroupID: "2", ID: "3", - Role: "voter", + Role: placement.Voter, Count: 1, StartKeyHex: "22", EndKeyHex: "dd", } - err = ruleManager.SetRule(rule) + err = apiRuleManager.SetRule(rule) re.NoError(err) testutil.Eventually(re, func() bool { - rules = loadRules(re, ruleStorage) + rules = ruleManager.GetAllRules() return len(rules) == 2 }) sort.Slice(rules, func(i, j int) bool { @@ -148,10 +115,10 @@ func (suite *ruleTestSuite) TestRuleWatch() { re.Equal(rule.StartKeyHex, rules[1].StartKeyHex) re.Equal(rule.EndKeyHex, rules[1].EndKeyHex) // Delete the rule. - err = ruleManager.DeleteRule(rule.GroupID, rule.ID) + err = apiRuleManager.DeleteRule(rule.GroupID, rule.ID) re.NoError(err) testutil.Eventually(re, func() bool { - rules = loadRules(re, ruleStorage) + rules = ruleManager.GetAllRules() return len(rules) == 1 }) re.Len(rules, 1) @@ -162,30 +129,35 @@ func (suite *ruleTestSuite) TestRuleWatch() { Index: 100, Override: true, } - err = ruleManager.SetRuleGroup(ruleGroup) + err = apiRuleManager.SetRuleGroup(ruleGroup) re.NoError(err) testutil.Eventually(re, func() bool { - ruleGroups = loadRuleGroups(re, ruleStorage) - return len(ruleGroups) == 1 + ruleGroups = ruleManager.GetRuleGroups() + return len(ruleGroups) == 2 }) - re.Len(ruleGroups, 1) - re.Equal(ruleGroup.ID, ruleGroups[0].ID) - re.Equal(ruleGroup.Index, ruleGroups[0].Index) - re.Equal(ruleGroup.Override, ruleGroups[0].Override) + re.Len(ruleGroups, 2) + re.Equal(ruleGroup.ID, ruleGroups[1].ID) + re.Equal(ruleGroup.Index, ruleGroups[1].Index) + re.Equal(ruleGroup.Override, ruleGroups[1].Override) // Delete the rule group. - err = ruleManager.DeleteRuleGroup(ruleGroup.ID) + err = apiRuleManager.DeleteRuleGroup(ruleGroup.ID) re.NoError(err) testutil.Eventually(re, func() bool { - ruleGroups = loadRuleGroups(re, ruleStorage) - return len(ruleGroups) == 0 + ruleGroups = ruleManager.GetRuleGroups() + return len(ruleGroups) == 1 }) - re.Empty(ruleGroups) + re.Len(ruleGroups, 1) // Test the region label rule watch. - labelRules := loadRegionRules(re, ruleStorage) - re.Len(labelRules, 1) - defaultKeyspaceRule := keyspace.MakeLabelRule(utils.DefaultKeyspaceID) - re.Equal(defaultKeyspaceRule, labelRules[0]) + regionLabeler := cluster.GetRegionLabeler() + labelRules := regionLabeler.GetAllLabelRules() + apiRegionLabeler := suite.pdLeaderServer.GetRaftCluster().GetRegionLabeler() + apiLabelRules := apiRegionLabeler.GetAllLabelRules() + re.Len(labelRules, len(apiLabelRules)) + re.Equal(apiLabelRules[0].ID, labelRules[0].ID) + re.Equal(apiLabelRules[0].Index, labelRules[0].Index) + re.Equal(apiLabelRules[0].Labels, labelRules[0].Labels) + re.Equal(apiLabelRules[0].RuleType, labelRules[0].RuleType) // Set a new region label rule. labelRule := &labeler.LabelRule{ ID: "rule1", @@ -193,11 +165,10 @@ func (suite *ruleTestSuite) TestRuleWatch() { RuleType: "key-range", Data: labeler.MakeKeyRanges("1234", "5678"), } - regionLabeler := suite.pdLeaderServer.GetRaftCluster().GetRegionLabeler() - err = regionLabeler.SetLabelRule(labelRule) + err = apiRegionLabeler.SetLabelRule(labelRule) re.NoError(err) testutil.Eventually(re, func() bool { - labelRules = loadRegionRules(re, ruleStorage) + labelRules = regionLabeler.GetAllLabelRules() return len(labelRules) == 2 }) sort.Slice(labelRules, func(i, j int) bool { @@ -218,17 +189,16 @@ func (suite *ruleTestSuite) TestRuleWatch() { SetRules: []*labeler.LabelRule{labelRule}, DeleteRules: []string{"rule1"}, } - err = regionLabeler.Patch(patch) + err = apiRegionLabeler.Patch(patch) re.NoError(err) testutil.Eventually(re, func() bool { - labelRules = loadRegionRules(re, ruleStorage) + labelRules = regionLabeler.GetAllLabelRules() return len(labelRules) == 2 }) sort.Slice(labelRules, func(i, j int) bool { return labelRules[i].ID < labelRules[j].ID }) re.Len(labelRules, 2) - re.Equal(defaultKeyspaceRule, labelRules[0]) re.Equal(labelRule.ID, labelRules[1].ID) re.Equal(labelRule.Labels, labelRules[1].Labels) re.Equal(labelRule.RuleType, labelRules[1].RuleType) diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index e469c593b84..45c25f01d1e 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -18,9 +18,11 @@ import ( "context" "fmt" "net/http" + "reflect" "testing" "time" + "github.com/docker/go-units" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -287,3 +289,80 @@ func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, }) re.ElementsMatch(evictStoreIDs, expected) } + +func (suite *serverTestSuite) TestForwardRegionHeartbeat() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + s := &server.GrpcServer{Server: suite.pdLeader.GetServer()} + for i := uint64(1); i <= 3; i++ { + resp, err := s.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: i, + Address: fmt.Sprintf("mock://%d", i), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + } + + grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr()) + stream, err := grpcPDClient.RegionHeartbeat(suite.ctx) + re.NoError(err) + peers := []*metapb.Peer{ + {Id: 11, StoreId: 1}, + {Id: 22, StoreId: 2}, + {Id: 33, StoreId: 3}, + } + queryStats := &pdpb.QueryStats{ + Get: 5, + Coprocessor: 6, + Scan: 7, + Put: 8, + Delete: 9, + DeleteRange: 10, + AcquirePessimisticLock: 11, + Rollback: 12, + Prewrite: 13, + Commit: 14, + } + interval := &pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10} + downPeers := []*pdpb.PeerStats{{Peer: peers[2], DownSeconds: 100}} + pendingPeers := []*metapb.Peer{peers[2]} + regionReq := &pdpb.RegionHeartbeatRequest{ + Header: testutil.NewRequestHeader(suite.pdLeader.GetClusterID()), + Region: &metapb.Region{Id: 10, Peers: peers, StartKey: []byte("a"), EndKey: []byte("b")}, + Leader: peers[0], + DownPeers: downPeers, + PendingPeers: pendingPeers, + BytesWritten: 10, + BytesRead: 20, + KeysWritten: 100, + KeysRead: 200, + ApproximateSize: 30 * units.MiB, + ApproximateKeys: 300, + Interval: interval, + QueryStats: queryStats, + Term: 1, + CpuUsage: 100, + } + err = stream.Send(regionReq) + re.NoError(err) + testutil.Eventually(re, func() bool { + region := tc.GetPrimaryServer().GetCluster().GetRegion(10) + return region.GetBytesRead() == 20 && region.GetBytesWritten() == 10 && + region.GetKeysRead() == 200 && region.GetKeysWritten() == 100 && region.GetTerm() == 1 && + region.GetApproximateKeys() == 300 && region.GetApproximateSize() == 30 && + reflect.DeepEqual(region.GetLeader(), peers[0]) && + reflect.DeepEqual(region.GetInterval(), interval) && region.GetReadQueryNum() == 18 && region.GetWriteQueryNum() == 77 && + reflect.DeepEqual(region.GetDownPeers(), downPeers) && reflect.DeepEqual(region.GetPendingPeers(), pendingPeers) + }) +} diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index ee1a506369f..105c4e594a8 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/tests" @@ -587,16 +588,35 @@ func TestForwardSchedulerRequest(t *testing.T) { server := cluster.GetServer(cluster.GetLeader()) re.NoError(server.BootstrapCluster()) backendEndpoints := server.GetAddr() - tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(ctx, 1, backendEndpoints) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) cmd := pdctlCmd.GetRootCmd() args := []string{"-u", backendEndpoints, "scheduler", "show"} - var slice []string - output, err := pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - re.NoError(json.Unmarshal(output, &slice)) - re.Contains(slice, "balance-leader-scheduler") + var sches []string + testutil.Eventually(re, func() bool { + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &sches)) + return slice.Contains(sches, "balance-leader-scheduler") + }) + + mustUsage := func(args []string) { + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "Usage") + } + mustUsage([]string{"-u", backendEndpoints, "scheduler", "pause", "balance-leader-scheduler"}) + echo := mustExec(re, cmd, []string{"-u", backendEndpoints, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") + checkSchedulerWithStatusCommand := func(status string, expected []string) { + var schedulers []string + mustExec(re, cmd, []string{"-u", backendEndpoints, "scheduler", "show", "--status", status}, &schedulers) + re.Equal(expected, schedulers) + } + checkSchedulerWithStatusCommand("paused", []string{ + "balance-leader-scheduler", + }) } diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 4533073f077..21e0e6674e0 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -138,6 +138,7 @@ func (suite *middlewareTestSuite) TearDownSuite() { func (suite *middlewareTestSuite) TestRequestInfoMiddleware() { suite.NoError(failpoint.Enable("github.com/tikv/pd/server/api/addRequestInfoMiddleware", "return(true)")) leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + suite.NotNil(leader) input := map[string]interface{}{ "enable-audit": "true", @@ -208,6 +209,7 @@ func BenchmarkDoRequestWithServiceMiddleware(b *testing.B) { func (suite *middlewareTestSuite) TestRateLimitMiddleware() { leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + suite.NotNil(leader) input := map[string]interface{}{ "enable-rate-limit": "true", } @@ -372,6 +374,7 @@ func (suite *middlewareTestSuite) TestRateLimitMiddleware() { func (suite *middlewareTestSuite) TestSwaggerUrl() { leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + suite.NotNil(leader) req, _ := http.NewRequest(http.MethodGet, leader.GetAddr()+"/swagger/ui/index", nil) resp, err := dialClient.Do(req) suite.NoError(err) @@ -381,6 +384,7 @@ func (suite *middlewareTestSuite) TestSwaggerUrl() { func (suite *middlewareTestSuite) TestAuditPrometheusBackend() { leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + suite.NotNil(leader) input := map[string]interface{}{ "enable-audit": "true", } @@ -449,6 +453,7 @@ func (suite *middlewareTestSuite) TestAuditLocalLogBackend() { fname := testutil.InitTempFileLogger("info") defer os.RemoveAll(fname) leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + suite.NotNil(leader) input := map[string]interface{}{ "enable-audit": "true", }