diff --git a/go.mod b/go.mod index bb72c0d5dba..fdde4fa7d19 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 + github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 diff --git a/go.sum b/go.sum index a007acf869e..5cff08df752 100644 --- a/go.sum +++ b/go.sum @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM= -github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs= +github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go new file mode 100644 index 00000000000..0b3e0351b16 --- /dev/null +++ b/pkg/cluster/cluster.go @@ -0,0 +1,65 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/statistics" +) + +// Cluster provides an overview of a cluster's basic information. +type Cluster interface { + GetHotStat() *statistics.HotStat + GetRegionStats() *statistics.RegionStatistics + GetLabelStats() *statistics.LabelStatistics + GetCoordinator() *schedule.Coordinator + GetRuleManager() *placement.RuleManager +} + +// HandleStatsAsync handles the flow asynchronously. +func HandleStatsAsync(c Cluster, region *core.RegionInfo) { + c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) + c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + for _, peer := range region.GetPeers() { + peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) + c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) +} + +// HandleOverlaps handles the overlap regions. +func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) { + for _, item := range overlaps { + if c.GetRegionStats() != nil { + c.GetRegionStats().ClearDefunctRegion(item.GetID()) + } + c.GetLabelStats().ClearDefunctRegion(item.GetID()) + c.GetRuleManager().InvalidCache(item.GetID()) + } +} + +// Collect collects the cluster information. +func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats, isNew, isPrepared bool) { + if hasRegionStats { + c.GetRegionStats().Observe(region, stores) + } + if !isPrepared && isNew { + c.GetCoordinator().GetPrepareChecker().Collect(region) + } +} diff --git a/pkg/core/region.go b/pkg/core/region.go index 57ba5cb3db0..4540f7aafb3 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -143,8 +143,31 @@ const ( InitClusterRegionThreshold = 100 ) +// RegionHeartbeatResponse is the interface for region heartbeat response. +type RegionHeartbeatResponse interface { + GetTargetPeer() *metapb.Peer + GetRegionId() uint64 +} + +// RegionHeartbeatRequest is the interface for region heartbeat request. +type RegionHeartbeatRequest interface { + GetTerm() uint64 + GetRegion() *metapb.Region + GetLeader() *metapb.Peer + GetDownPeers() []*pdpb.PeerStats + GetPendingPeers() []*metapb.Peer + GetBytesWritten() uint64 + GetKeysWritten() uint64 + GetBytesRead() uint64 + GetKeysRead() uint64 + GetInterval() *pdpb.TimeInterval + GetQueryStats() *pdpb.QueryStats + GetApproximateSize() uint64 + GetApproximateKeys() uint64 +} + // RegionFromHeartbeat constructs a Region from region heartbeat. -func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo { +func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo { // Convert unit to MB. // If region isn't empty and less than 1MB, use 1MB instead. regionSize := heartbeat.GetApproximateSize() / units.MiB @@ -153,25 +176,28 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize { regionSize = EmptyRegionApproximateSize } - regionKvSize := heartbeat.GetApproximateKvSize() / units.MiB region := &RegionInfo{ - term: heartbeat.GetTerm(), - meta: heartbeat.GetRegion(), - leader: heartbeat.GetLeader(), - downPeers: heartbeat.GetDownPeers(), - pendingPeers: heartbeat.GetPendingPeers(), - cpuUsage: heartbeat.GetCpuUsage(), - writtenBytes: heartbeat.GetBytesWritten(), - writtenKeys: heartbeat.GetKeysWritten(), - readBytes: heartbeat.GetBytesRead(), - readKeys: heartbeat.GetKeysRead(), - approximateSize: int64(regionSize), - approximateKvSize: int64(regionKvSize), - approximateKeys: int64(heartbeat.GetApproximateKeys()), - interval: heartbeat.GetInterval(), - replicationStatus: heartbeat.GetReplicationStatus(), - queryStats: heartbeat.GetQueryStats(), + term: heartbeat.GetTerm(), + meta: heartbeat.GetRegion(), + leader: heartbeat.GetLeader(), + downPeers: heartbeat.GetDownPeers(), + pendingPeers: heartbeat.GetPendingPeers(), + writtenBytes: heartbeat.GetBytesWritten(), + writtenKeys: heartbeat.GetKeysWritten(), + readBytes: heartbeat.GetBytesRead(), + readKeys: heartbeat.GetKeysRead(), + approximateSize: int64(regionSize), + approximateKeys: int64(heartbeat.GetApproximateKeys()), + interval: heartbeat.GetInterval(), + queryStats: heartbeat.GetQueryStats(), + } + + // scheduling service doesn't need the following fields. + if h, ok := heartbeat.(*pdpb.RegionHeartbeatRequest); ok { + region.approximateKvSize = int64(h.GetApproximateKvSize() / units.MiB) + region.replicationStatus = h.GetReplicationStatus() + region.cpuUsage = h.GetCpuUsage() } for _, opt := range opts { diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index eb6876db0a1..0b9924f230b 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" @@ -17,6 +18,7 @@ import ( sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/slice" @@ -38,7 +40,7 @@ type Cluster struct { ruleManager *placement.RuleManager labelerManager *labeler.RegionLabeler regionStats *statistics.RegionStatistics - labelLevelStats *statistics.LabelStatistics + labelStats *statistics.LabelStatistics hotStat *statistics.HotStat storage storage.Storage coordinator *schedule.Coordinator @@ -66,8 +68,8 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, labelerManager: labelerManager, persistConfig: persistConfig, hotStat: statistics.NewHotStat(ctx), + labelStats: statistics.NewLabelStatistics(), regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), - labelLevelStats: statistics.NewLabelStatistics(), storage: storage, clusterID: clusterID, checkMembershipCh: checkMembershipCh, @@ -86,6 +88,21 @@ func (c *Cluster) GetCoordinator() *schedule.Coordinator { return c.coordinator } +// GetHotStat gets hot stat. +func (c *Cluster) GetHotStat() *statistics.HotStat { + return c.hotStat +} + +// GetRegionStats gets region statistics. +func (c *Cluster) GetRegionStats() *statistics.RegionStatistics { + return c.regionStats +} + +// GetLabelStats gets label statistics. +func (c *Cluster) GetLabelStats() *statistics.LabelStatistics { + return c.labelStats +} + // GetBasicCluster returns the basic cluster. func (c *Cluster) GetBasicCluster() *core.BasicCluster { return c.BasicCluster @@ -287,7 +304,7 @@ func (c *Cluster) waitSchedulersInitialized() { // UpdateRegionsLabelLevelStats updates the status of the region label level by types. func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { for _, region := range regions { - c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) + c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) } } @@ -389,3 +406,61 @@ func (c *Cluster) StopBackgroundJobs() { c.cancel() c.wg.Wait() } + +// HandleRegionHeartbeat processes RegionInfo reports from client. +func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { + if err := c.processRegionHeartbeat(region); err != nil { + return err + } + + c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) + return nil +} + +// processRegionHeartbeat updates the region information. +func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { + origin, _, err := c.PreCheckPutRegion(region) + if err != nil { + return err + } + if c.GetStoreConfig().IsEnableRegionBucket() { + region.InheritBuckets(origin) + } + + cluster.HandleStatsAsync(c, region) + + hasRegionStats := c.regionStats != nil + // Save to storage if meta is updated, except for flashback. + // Save to cache if meta or leader is updated, or contains any down/pending peer. + // Mark isNew if the region in cache does not have leader. + isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin) + if !saveCache && !isNew { + // Due to some config changes need to update the region stats as well, + // so we do some extra checks here. + if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { + c.regionStats.Observe(region, c.GetRegionStores(region)) + } + return nil + } + + var overlaps []*core.RegionInfo + if saveCache { + // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, + // check its validation again here. + // + // However it can't solve the race condition of concurrent heartbeats from the same region. + if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil { + return err + } + + cluster.HandleOverlaps(c, overlaps) + } + + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) + return nil +} + +// IsPrepared return true if the prepare checker is ready. +func (c *Cluster) IsPrepared() bool { + return c.coordinator.GetPrepareChecker().IsPrepared() +} diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 82c15632b3d..e1d680069ce 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -110,7 +110,6 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error { configutil.AdjustCommandLineString(flagSet, &c.BackendEndpoints, "backend-endpoints") configutil.AdjustCommandLineString(flagSet, &c.ListenAddr, "listen-addr") configutil.AdjustCommandLineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr") - return c.adjust(meta) } diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index f615e0c37c0..4558688822a 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -16,13 +16,19 @@ package server import ( "context" + "io" "net/http" + "sync/atomic" + "time" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -67,6 +73,104 @@ func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService { } } +// heartbeatServer wraps Scheduling_RegionHeartbeatServer to ensure when any error +// occurs on Send() or Recv(), both endpoints will be closed. +type heartbeatServer struct { + stream schedulingpb.Scheduling_RegionHeartbeatServer + closed int32 +} + +func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { + if atomic.LoadInt32(&s.closed) == 1 { + return io.EOF + } + done := make(chan error, 1) + go func() { + defer logutil.LogPanic() + done <- s.stream.Send(m.(*schedulingpb.RegionHeartbeatResponse)) + }() + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { + case err := <-done: + if err != nil { + atomic.StoreInt32(&s.closed, 1) + } + return errors.WithStack(err) + case <-timer.C: + atomic.StoreInt32(&s.closed, 1) + return status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") + } +} + +func (s *heartbeatServer) Recv() (*schedulingpb.RegionHeartbeatRequest, error) { + if atomic.LoadInt32(&s.closed) == 1 { + return nil, io.EOF + } + req, err := s.stream.Recv() + if err != nil { + atomic.StoreInt32(&s.closed, 1) + return nil, errors.WithStack(err) + } + return req, nil +} + +// RegionHeartbeat implements gRPC PDServer. +func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeatServer) error { + var ( + server = &heartbeatServer{stream: stream} + cancel context.CancelFunc + lastBind time.Time + ) + defer func() { + // cancel the forward stream + if cancel != nil { + cancel() + } + }() + + for { + request, err := server.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return errors.WithStack(err) + } + + c := s.GetCluster() + if c == nil { + resp := &schedulingpb.RegionHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ + ClusterId: s.clusterID, + Error: &schedulingpb.Error{ + Type: schedulingpb.ErrorType_NOT_BOOTSTRAPPED, + Message: "scheduling server is not initialized yet", + }, + }} + err := server.Send(resp) + return errors.WithStack(err) + } + + storeID := request.GetLeader().GetStoreId() + store := c.GetStore(storeID) + if store == nil { + return errors.Errorf("invalid store ID %d, not found", storeID) + } + + if time.Since(lastBind) > time.Minute { + s.hbStreams.BindStream(storeID, server) + lastBind = time.Now() + } + region := core.RegionFromHeartbeat(request, core.SetFromHeartbeat(true)) + err = c.HandleRegionHeartbeat(region) + if err != nil { + // TODO: if we need to send the error back to API server. + log.Error("failed handle region heartbeat", zap.Error(err)) + continue + } + } +} + // StoreHeartbeat implements gRPC PDServer. func (s *Service) StoreHeartbeat(ctx context.Context, request *schedulingpb.StoreHeartbeatRequest) (*schedulingpb.StoreHeartbeatResponse, error) { c := s.GetCluster() diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index f4c5c676dd3..fd7621bf2cb 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -437,7 +437,7 @@ func (s *Server) startCluster(context.Context) error { if err != nil { return err } - s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster) + s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, utils.SchedulingServiceName, s.basicCluster) s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh) if err != nil { return err diff --git a/pkg/mock/mockhbstream/mockhbstream.go b/pkg/mock/mockhbstream/mockhbstream.go index c94042bf102..289f31d63dd 100644 --- a/pkg/mock/mockhbstream/mockhbstream.go +++ b/pkg/mock/mockhbstream/mockhbstream.go @@ -25,18 +25,18 @@ import ( // HeartbeatStream is used to mock HeartbeatStream for test use. type HeartbeatStream struct { - ch chan *pdpb.RegionHeartbeatResponse + ch chan core.RegionHeartbeatResponse } // NewHeartbeatStream creates a new HeartbeatStream. func NewHeartbeatStream() HeartbeatStream { return HeartbeatStream{ - ch: make(chan *pdpb.RegionHeartbeatResponse), + ch: make(chan core.RegionHeartbeatResponse), } } // Send mocks method. -func (s HeartbeatStream) Send(m *pdpb.RegionHeartbeatResponse) error { +func (s HeartbeatStream) Send(m core.RegionHeartbeatResponse) error { select { case <-time.After(time.Second): return errors.New("timeout") @@ -52,7 +52,7 @@ func (s HeartbeatStream) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartb func (s HeartbeatStream) BindStream(storeID uint64, stream hbstream.HeartbeatStream) {} // Recv mocks method. -func (s HeartbeatStream) Recv() *pdpb.RegionHeartbeatResponse { +func (s HeartbeatStream) Recv() core.RegionHeartbeatResponse { select { case <-time.After(time.Millisecond * 10): return nil diff --git a/pkg/mock/mockhbstream/mockhbstream_test.go b/pkg/mock/mockhbstream/mockhbstream_test.go index 46af7df534b..a8e88f61aee 100644 --- a/pkg/mock/mockhbstream/mockhbstream_test.go +++ b/pkg/mock/mockhbstream/mockhbstream_test.go @@ -22,12 +22,10 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/pkg/mock/mockconfig" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/utils/testutil" - "github.com/tikv/pd/pkg/utils/typeutil" ) func TestActivity(t *testing.T) { @@ -41,37 +39,33 @@ func TestActivity(t *testing.T) { cluster.AddRegionStore(2, 0) cluster.AddLeaderRegion(1, 1) region := cluster.GetRegion(1) - msg := &pdpb.RegionHeartbeatResponse{ - ChangePeer: &pdpb.ChangePeer{Peer: &metapb.Peer{Id: 2, StoreId: 2}, ChangeType: eraftpb.ConfChangeType_AddLearnerNode}, - } - hbs := hbstream.NewTestHeartbeatStreams(ctx, cluster.ID, cluster, true) stream1, stream2 := NewHeartbeatStream(), NewHeartbeatStream() // Active stream is stream1. hbs.BindStream(1, stream1) testutil.Eventually(re, func() bool { - newMsg := typeutil.DeepClone(msg, core.RegionHeartbeatResponseFactory) - hbs.SendMsg(region, newMsg) + msg := &hbstream.Operation{ChangePeer: &pdpb.ChangePeer{Peer: &metapb.Peer{Id: 2, StoreId: 2}, ChangeType: eraftpb.ConfChangeType_AddLearnerNode}} + hbs.SendMsg(region, msg) return stream1.Recv() != nil && stream2.Recv() == nil }) // Rebind to stream2. hbs.BindStream(1, stream2) testutil.Eventually(re, func() bool { - newMsg := typeutil.DeepClone(msg, core.RegionHeartbeatResponseFactory) - hbs.SendMsg(region, newMsg) + msg := &hbstream.Operation{ChangePeer: &pdpb.ChangePeer{Peer: &metapb.Peer{Id: 2, StoreId: 2}, ChangeType: eraftpb.ConfChangeType_AddLearnerNode}} + hbs.SendMsg(region, msg) return stream1.Recv() == nil && stream2.Recv() != nil }) // SendErr to stream2. hbs.SendErr(pdpb.ErrorType_UNKNOWN, "test error", &metapb.Peer{Id: 1, StoreId: 1}) res := stream2.Recv() re.NotNil(res) - re.NotNil(res.GetHeader().GetError()) + re.NotNil(res.(*pdpb.RegionHeartbeatResponse).GetHeader().GetError()) // Switch back to 1 again. hbs.BindStream(1, stream1) testutil.Eventually(re, func() bool { - newMsg := typeutil.DeepClone(msg, core.RegionHeartbeatResponseFactory) - hbs.SendMsg(region, newMsg) + msg := &hbstream.Operation{ChangePeer: &pdpb.ChangePeer{Peer: &metapb.Peer{Id: 2, StoreId: 2}, ChangeType: eraftpb.ConfChangeType_AddLearnerNode}} + hbs.SendMsg(region, msg) return stream1.Recv() != nil && stream2.Recv() == nil }) } diff --git a/pkg/schedule/hbstream/heartbeat_streams.go b/pkg/schedule/hbstream/heartbeat_streams.go index d80b6ff3a46..e7d7f688035 100644 --- a/pkg/schedule/hbstream/heartbeat_streams.go +++ b/pkg/schedule/hbstream/heartbeat_streams.go @@ -23,16 +23,30 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" ) +// Operation is detailed scheduling step of a region. +type Operation struct { + ChangePeer *pdpb.ChangePeer + // Pd can return transfer_leader to let TiKV does leader transfer itself. + TransferLeader *pdpb.TransferLeader + Merge *pdpb.Merge + // PD sends split_region to let TiKV split a region into two regions. + SplitRegion *pdpb.SplitRegion + ChangePeerV2 *pdpb.ChangePeerV2 + SwitchWitnesses *pdpb.BatchSwitchWitness +} + // HeartbeatStream is an interface. type HeartbeatStream interface { - Send(*pdpb.RegionHeartbeatResponse) error + Send(core.RegionHeartbeatResponse) error } const ( @@ -52,33 +66,35 @@ type HeartbeatStreams struct { hbStreamCancel context.CancelFunc clusterID uint64 streams map[uint64]HeartbeatStream - msgCh chan *pdpb.RegionHeartbeatResponse + msgCh chan core.RegionHeartbeatResponse streamCh chan streamUpdate storeInformer core.StoreSetInformer + typ string needRun bool // For test only. } // NewHeartbeatStreams creates a new HeartbeatStreams which enable background running by default. -func NewHeartbeatStreams(ctx context.Context, clusterID uint64, storeInformer core.StoreSetInformer) *HeartbeatStreams { - return newHbStreams(ctx, clusterID, storeInformer, true) +func NewHeartbeatStreams(ctx context.Context, clusterID uint64, typ string, storeInformer core.StoreSetInformer) *HeartbeatStreams { + return newHbStreams(ctx, clusterID, typ, storeInformer, true) } // NewTestHeartbeatStreams creates a new HeartbeatStreams for test purpose only. // Please use NewHeartbeatStreams for other usage. func NewTestHeartbeatStreams(ctx context.Context, clusterID uint64, storeInformer core.StoreSetInformer, needRun bool) *HeartbeatStreams { - return newHbStreams(ctx, clusterID, storeInformer, needRun) + return newHbStreams(ctx, clusterID, "", storeInformer, needRun) } -func newHbStreams(ctx context.Context, clusterID uint64, storeInformer core.StoreSetInformer, needRun bool) *HeartbeatStreams { +func newHbStreams(ctx context.Context, clusterID uint64, typ string, storeInformer core.StoreSetInformer, needRun bool) *HeartbeatStreams { hbStreamCtx, hbStreamCancel := context.WithCancel(ctx) hs := &HeartbeatStreams{ hbStreamCtx: hbStreamCtx, hbStreamCancel: hbStreamCancel, clusterID: clusterID, streams: make(map[uint64]HeartbeatStream), - msgCh: make(chan *pdpb.RegionHeartbeatResponse, heartbeatChanCapacity), + msgCh: make(chan core.RegionHeartbeatResponse, heartbeatChanCapacity), streamCh: make(chan streamUpdate, 1), storeInformer: storeInformer, + typ: typ, needRun: needRun, } if needRun { @@ -96,7 +112,13 @@ func (s *HeartbeatStreams) run() { keepAliveTicker := time.NewTicker(heartbeatStreamKeepAliveInterval) defer keepAliveTicker.Stop() - keepAlive := &pdpb.RegionHeartbeatResponse{Header: &pdpb.ResponseHeader{ClusterId: s.clusterID}} + var keepAlive core.RegionHeartbeatResponse + switch s.typ { + case utils.SchedulingServiceName: + keepAlive = &schedulingpb.RegionHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}} + default: + keepAlive = &pdpb.RegionHeartbeatResponse{Header: &pdpb.ResponseHeader{ClusterId: s.clusterID}} + } for { select { @@ -108,7 +130,7 @@ func (s *HeartbeatStreams) run() { store := s.storeInformer.GetStore(storeID) if store == nil { log.Error("failed to get store", - zap.Uint64("region-id", msg.RegionId), + zap.Uint64("region-id", msg.GetRegionId()), zap.Uint64("store-id", storeID), errs.ZapError(errs.ErrGetSourceStore)) delete(s.streams, storeID) continue @@ -117,7 +139,7 @@ func (s *HeartbeatStreams) run() { if stream, ok := s.streams[storeID]; ok { if err := stream.Send(msg); err != nil { log.Error("send heartbeat message fail", - zap.Uint64("region-id", msg.RegionId), errs.ZapError(errs.ErrGRPCSend.Wrap(err).GenWithStackByArgs())) + zap.Uint64("region-id", msg.GetRegionId()), errs.ZapError(errs.ErrGRPCSend.Wrap(err).GenWithStackByArgs())) delete(s.streams, storeID) heartbeatStreamCounter.WithLabelValues(storeAddress, storeLabel, "push", "err").Inc() } else { @@ -125,7 +147,7 @@ func (s *HeartbeatStreams) run() { } } else { log.Debug("heartbeat stream not found, skip send message", - zap.Uint64("region-id", msg.RegionId), + zap.Uint64("region-id", msg.GetRegionId()), zap.Uint64("store-id", storeID)) heartbeatStreamCounter.WithLabelValues(storeAddress, storeLabel, "push", "skip").Inc() } @@ -174,18 +196,44 @@ func (s *HeartbeatStreams) BindStream(storeID uint64, stream HeartbeatStream) { } // SendMsg sends a message to related store. -func (s *HeartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) { +func (s *HeartbeatStreams) SendMsg(region *core.RegionInfo, op *Operation) { if region.GetLeader() == nil { return } - msg.Header = &pdpb.ResponseHeader{ClusterId: s.clusterID} - msg.RegionId = region.GetID() - msg.RegionEpoch = region.GetRegionEpoch() - msg.TargetPeer = region.GetLeader() + // TODO: use generic + var resp core.RegionHeartbeatResponse + switch s.typ { + case utils.SchedulingServiceName: + resp = &schedulingpb.RegionHeartbeatResponse{ + Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}, + RegionId: region.GetID(), + RegionEpoch: region.GetRegionEpoch(), + TargetPeer: region.GetLeader(), + ChangePeer: op.ChangePeer, + TransferLeader: op.TransferLeader, + Merge: op.Merge, + SplitRegion: op.SplitRegion, + ChangePeerV2: op.ChangePeerV2, + SwitchWitnesses: op.SwitchWitnesses, + } + default: + resp = &pdpb.RegionHeartbeatResponse{ + Header: &pdpb.ResponseHeader{ClusterId: s.clusterID}, + RegionId: region.GetID(), + RegionEpoch: region.GetRegionEpoch(), + TargetPeer: region.GetLeader(), + ChangePeer: op.ChangePeer, + TransferLeader: op.TransferLeader, + Merge: op.Merge, + SplitRegion: op.SplitRegion, + ChangePeerV2: op.ChangePeerV2, + SwitchWitnesses: op.SwitchWitnesses, + } + } select { - case s.msgCh <- msg: + case s.msgCh <- resp: case <-s.hbStreamCtx.Done(): } } diff --git a/pkg/schedule/operator/step.go b/pkg/schedule/operator/step.go index 1a2107cb265..6f14cbb326b 100644 --- a/pkg/schedule/operator/step.go +++ b/pkg/schedule/operator/step.go @@ -28,6 +28,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" ) @@ -57,7 +58,7 @@ type OpStep interface { CheckInProgress(ci *core.BasicCluster, config config.SharedConfigProvider, region *core.RegionInfo) error Influence(opInfluence OpInfluence, region *core.RegionInfo) Timeout(regionSize int64) time.Duration - GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse + GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation } // TransferLeader is an OpStep that transfers a region's leader. @@ -126,12 +127,12 @@ func (tl TransferLeader) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (tl TransferLeader) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (tl TransferLeader) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { peers := make([]*metapb.Peer, 0, len(tl.ToStores)) for _, storeID := range tl.ToStores { peers = append(peers, region.GetStorePeer(storeID)) } - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ TransferLeader: &pdpb.TransferLeader{ Peer: region.GetStorePeer(tl.ToStore), Peers: peers, @@ -210,7 +211,7 @@ func (ap AddPeer) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (ap AddPeer) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (ap AddPeer) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { peer := region.GetStorePeer(ap.ToStore) if peer != nil { // The newly added peer is pending. @@ -274,7 +275,7 @@ func (bw BecomeWitness) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (bw BecomeWitness) GetCmd(_ *core.RegionInfo, _ bool) *pdpb.RegionHeartbeatResponse { +func (bw BecomeWitness) GetCmd(_ *core.RegionInfo, _ bool) *hbstream.Operation { return switchWitness(bw.PeerID, true) } @@ -342,7 +343,7 @@ func (bn BecomeNonWitness) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (bn BecomeNonWitness) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (bn BecomeNonWitness) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { return switchWitness(bn.PeerID, false) } @@ -426,7 +427,7 @@ func (bsw BatchSwitchWitness) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (bsw BatchSwitchWitness) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (bsw BatchSwitchWitness) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { switches := make([]*pdpb.SwitchWitness, 0, len(bsw.ToWitnesses)+len(bsw.ToNonWitnesses)) for _, w := range bsw.ToWitnesses { switches = append(switches, w.GetCmd(region, useConfChangeV2).SwitchWitnesses.SwitchWitnesses...) @@ -434,7 +435,7 @@ func (bsw BatchSwitchWitness) GetCmd(region *core.RegionInfo, useConfChangeV2 bo for _, nw := range bsw.ToNonWitnesses { switches = append(switches, nw.GetCmd(region, useConfChangeV2).SwitchWitnesses.SwitchWitnesses...) } - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ SwitchWitnesses: &pdpb.BatchSwitchWitness{ SwitchWitnesses: switches, }, @@ -522,7 +523,7 @@ func (al AddLearner) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (al AddLearner) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (al AddLearner) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { if region.GetStorePeer(al.ToStore) != nil { // The newly added peer is pending. return nil @@ -581,7 +582,7 @@ func (pl PromoteLearner) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (pl PromoteLearner) GetCmd(_ *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (pl PromoteLearner) GetCmd(_ *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { return createResponse(addNode(pl.PeerID, pl.ToStore, pl.IsWitness), useConfChangeV2) } @@ -652,7 +653,7 @@ func (rp RemovePeer) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (rp RemovePeer) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (rp RemovePeer) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { return createResponse(&pdpb.ChangePeer{ ChangeType: eraftpb.ConfChangeType_RemoveNode, Peer: region.GetStorePeer(rp.FromStore), @@ -714,11 +715,11 @@ func (mr MergeRegion) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (mr MergeRegion) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (mr MergeRegion) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { if mr.IsPassive { return nil } - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ Merge: &pdpb.Merge{ Target: mr.ToRegion, }, @@ -768,8 +769,8 @@ func (sr SplitRegion) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (sr SplitRegion) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { - return &pdpb.RegionHeartbeatResponse{ +func (sr SplitRegion) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { + return &hbstream.Operation{ SplitRegion: &pdpb.SplitRegion{ Policy: sr.Policy, Keys: sr.SplitKeys, @@ -818,7 +819,7 @@ func (dv DemoteVoter) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (dv DemoteVoter) GetCmd(_ *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (dv DemoteVoter) GetCmd(_ *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { return createResponse(addLearnerNode(dv.PeerID, dv.ToStore, dv.IsWitness), useConfChangeV2) } @@ -940,7 +941,7 @@ func (cpe ChangePeerV2Enter) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (cpe ChangePeerV2Enter) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (cpe ChangePeerV2Enter) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { if !useConfChangeV2 { // only supported in ChangePeerV2 return nil @@ -952,7 +953,7 @@ func (cpe ChangePeerV2Enter) GetCmd(region *core.RegionInfo, useConfChangeV2 boo for _, dv := range cpe.DemoteVoters { changes = append(changes, dv.GetCmd(region, useConfChangeV2).ChangePeerV2.Changes...) } - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ ChangePeerV2: &pdpb.ChangePeerV2{ Changes: changes, }, @@ -1080,12 +1081,12 @@ func (cpl ChangePeerV2Leave) Timeout(regionSize int64) time.Duration { } // GetCmd returns the schedule command for heartbeat response. -func (cpl ChangePeerV2Leave) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func (cpl ChangePeerV2Leave) GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *hbstream.Operation { if !useConfChangeV2 { // only supported in ChangePeerV2 return nil } - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ ChangePeerV2: &pdpb.ChangePeerV2{}, } } @@ -1143,21 +1144,21 @@ func addLearnerNode(id, storeID uint64, isWitness bool) *pdpb.ChangePeer { } } -func createResponse(change *pdpb.ChangePeer, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse { +func createResponse(change *pdpb.ChangePeer, useConfChangeV2 bool) *hbstream.Operation { if useConfChangeV2 { - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ ChangePeerV2: &pdpb.ChangePeerV2{ Changes: []*pdpb.ChangePeer{change}, }, } } - return &pdpb.RegionHeartbeatResponse{ + return &hbstream.Operation{ ChangePeer: change, } } -func switchWitness(peerID uint64, isWitness bool) *pdpb.RegionHeartbeatResponse { - return &pdpb.RegionHeartbeatResponse{ +func switchWitness(peerID uint64, isWitness bool) *hbstream.Operation { + return &hbstream.Operation{ SwitchWitnesses: &pdpb.BatchSwitchWitness{ SwitchWitnesses: []*pdpb.SwitchWitness{{PeerId: peerID, IsWitness: isWitness}}, }, diff --git a/server/api/stats.go b/server/api/stats.go index e8b04ba588e..1798597b6cc 100644 --- a/server/api/stats.go +++ b/server/api/stats.go @@ -48,7 +48,7 @@ func (h *statsHandler) GetRegionStatus(w http.ResponseWriter, r *http.Request) { if r.URL.Query().Has("count") { stats = rc.GetRegionCount([]byte(startKey), []byte(endKey)) } else { - stats = rc.GetRegionStats([]byte(startKey), []byte(endKey)) + stats = rc.GetRegionStatsByRange([]byte(startKey), []byte(endKey)) } h.rd.JSON(w, http.StatusOK, stats) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 94761c330b6..dbd640d6e8c 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" @@ -894,11 +895,21 @@ func (c *RaftCluster) GetSuspectRegions() []uint64 { return c.coordinator.GetCheckerController().GetSuspectRegions() } -// GetHotStat gets hot stat for test. +// GetHotStat gets hot stat. func (c *RaftCluster) GetHotStat() *statistics.HotStat { return c.hotStat } +// GetRegionStats gets region statistics. +func (c *RaftCluster) GetRegionStats() *statistics.RegionStatistics { + return c.regionStats +} + +// GetLabelStats gets label statistics. +func (c *RaftCluster) GetLabelStats() *statistics.LabelStatistics { + return c.labelLevelStats +} + // RemoveSuspectRegion removes region from suspect list. func (c *RaftCluster) RemoveSuspectRegion(id uint64) { c.coordinator.GetCheckerController().RemoveSuspectRegion(id) @@ -1099,15 +1110,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } if !c.isAPIServiceMode { - c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) - c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) - c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) - } - c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region) + cluster.HandleStatsAsync(c, region) } hasRegionStats := c.regionStats != nil @@ -1140,27 +1143,16 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { return err } - - for _, item := range overlaps { - if !c.isAPIServiceMode { - if c.regionStats != nil { - c.regionStats.ClearDefunctRegion(item.GetID()) - } - c.labelLevelStats.ClearDefunctRegion(item.GetID()) - } - c.ruleManager.InvalidCache(item.GetID()) + if !c.isAPIServiceMode { + cluster.HandleOverlaps(c, overlaps) } regionUpdateCacheEventCounter.Inc() } if !c.isAPIServiceMode { - if hasRegionStats { - c.regionStats.Observe(region, c.getRegionStoresLocked(region)) - } - } - if !c.IsPrepared() && isNew { - c.coordinator.GetPrepareChecker().Collect(region) + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) } + if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. @@ -2333,8 +2325,8 @@ func (c *RaftCluster) PutMetaCluster(meta *metapb.Cluster) error { return c.putMetaLocked(typeutil.DeepClone(meta, core.ClusterFactory)) } -// GetRegionStats returns region statistics from cluster. -func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.RegionStats { +// GetRegionStatsByRange returns region statistics from cluster. +func (c *RaftCluster) GetRegionStatsByRange(startKey, endKey []byte) *statistics.RegionStats { return statistics.GetRegionStats(c.core.ScanRegions(startKey, endKey, -1)) } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 5679fd6128d..c9d4d0f8f61 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3637,7 +3637,7 @@ func TestInterval(t *testing.T) { func waitAddLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.Eventually(re, func() bool { - if res = stream.Recv(); res != nil { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { return res.GetRegionId() == region.GetID() && res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddLearnerNode && res.GetChangePeer().GetPeer().GetStoreId() == storeID @@ -3653,7 +3653,7 @@ func waitAddLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, func waitPromoteLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.Eventually(re, func() bool { - if res = stream.Recv(); res != nil { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { return res.GetRegionId() == region.GetID() && res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddNode && res.GetChangePeer().GetPeer().GetStoreId() == storeID @@ -3670,7 +3670,7 @@ func waitPromoteLearner(re *require.Assertions, stream mockhbstream.HeartbeatStr func waitRemovePeer(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.Eventually(re, func() bool { - if res = stream.Recv(); res != nil { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { return res.GetRegionId() == region.GetID() && res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_RemoveNode && res.GetChangePeer().GetPeer().GetStoreId() == storeID @@ -3686,7 +3686,7 @@ func waitRemovePeer(re *require.Assertions, stream mockhbstream.HeartbeatStream, func waitTransferLeader(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.Eventually(re, func() bool { - if res = stream.Recv(); res != nil { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { if res.GetRegionId() == region.GetID() { for _, peer := range append(res.GetTransferLeader().GetPeers(), res.GetTransferLeader().GetPeer()) { if peer.GetStoreId() == storeID { diff --git a/server/grpc_service.go b/server/grpc_service.go index 5a483b71818..5e40bc1c732 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1093,14 +1093,14 @@ type heartbeatServer struct { closed int32 } -func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error { +func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { if atomic.LoadInt32(&s.closed) == 1 { return io.EOF } done := make(chan error, 1) go func() { defer logutil.LogPanic() - done <- s.stream.Send(m) + done <- s.stream.Send(m.(*pdpb.RegionHeartbeatResponse)) }() timer := time.NewTimer(heartbeatSendTimeout) defer timer.Stop() @@ -1232,6 +1232,9 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error lastForwardedHost string lastBind time.Time errCh chan error + schedulingStream schedulingpb.Scheduling_RegionHeartbeatClient + cancel1 context.CancelFunc + lastPrimaryAddr string ) defer func() { // cancel the forward stream @@ -1345,6 +1348,55 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error s.hbStreams.SendErr(pdpb.ErrorType_UNKNOWN, msg, request.GetLeader()) continue } + + if s.IsAPIServiceMode() { + ctx := stream.Context() + primaryAddr, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) + if schedulingStream == nil || lastPrimaryAddr != primaryAddr { + if cancel1 != nil { + cancel1() + } + client, err := s.getDelegateClient(ctx, primaryAddr) + if err != nil { + log.Error("get delegate client failed", zap.Error(err)) + } + + log.Info("create region heartbeat forward stream", zap.String("forwarded-host", primaryAddr)) + schedulingStream, cancel1, err = s.createSchedulingStream(client) + if err != nil { + log.Error("create region heartbeat forward stream failed", zap.Error(err)) + } else { + lastPrimaryAddr = primaryAddr + errCh = make(chan error, 1) + go forwardSchedulingToServer(schedulingStream, server, errCh) + } + } + if schedulingStream != nil { + req := &schedulingpb.RegionHeartbeatRequest{ + Header: &schedulingpb.RequestHeader{ + ClusterId: request.GetHeader().GetClusterId(), + SenderId: request.GetHeader().GetSenderId(), + }, + Region: request.GetRegion(), + Leader: request.GetLeader(), + DownPeers: request.GetDownPeers(), + PendingPeers: request.GetPendingPeers(), + BytesWritten: request.GetBytesWritten(), + BytesRead: request.GetBytesRead(), + KeysWritten: request.GetKeysWritten(), + KeysRead: request.GetKeysRead(), + ApproximateSize: request.GetApproximateSize(), + ApproximateKeys: request.GetApproximateKeys(), + Interval: request.GetInterval(), + Term: request.GetTerm(), + QueryStats: request.GetQueryStats(), + } + if err := schedulingStream.Send(req); err != nil { + log.Error("forward region heartbeat failed", zap.Error(err)) + } + } + } + regionHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc() } @@ -2294,6 +2346,47 @@ func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatC } } +func (s *GrpcServer) createSchedulingStream(client *grpc.ClientConn) (schedulingpb.Scheduling_RegionHeartbeatClient, context.CancelFunc, error) { + done := make(chan struct{}) + ctx, cancel := context.WithCancel(s.ctx) + go grpcutil.CheckStream(ctx, cancel, done) + forwardStream, err := schedulingpb.NewSchedulingClient(client).RegionHeartbeat(ctx) + done <- struct{}{} + return forwardStream, cancel, err +} + +func forwardSchedulingToServer(forwardStream schedulingpb.Scheduling_RegionHeartbeatClient, server *heartbeatServer, errCh chan error) { + defer logutil.LogPanic() + defer close(errCh) + for { + resp, err := forwardStream.Recv() + if err != nil { + errCh <- errors.WithStack(err) + return + } + response := &pdpb.RegionHeartbeatResponse{ + Header: &pdpb.ResponseHeader{ + ClusterId: resp.GetHeader().GetClusterId(), + // ignore error here + }, + ChangePeer: resp.GetChangePeer(), + TransferLeader: resp.GetTransferLeader(), + RegionId: resp.GetRegionId(), + RegionEpoch: resp.GetRegionEpoch(), + TargetPeer: resp.GetTargetPeer(), + Merge: resp.GetMerge(), + SplitRegion: resp.GetSplitRegion(), + ChangePeerV2: resp.GetChangePeerV2(), + SwitchWitnesses: resp.GetSwitchWitnesses(), + } + + if err := server.Send(response); err != nil { + errCh <- errors.WithStack(err) + return + } + } +} + func (s *GrpcServer) createTSOForwardStream( ctx context.Context, client *grpc.ClientConn, ) (tsopb.TSO_TsoClient, context.Context, context.CancelFunc, error) { diff --git a/server/server.go b/server/server.go index 9e72477368d..03e036a968e 100644 --- a/server/server.go +++ b/server/server.go @@ -485,7 +485,7 @@ func (s *Server) startServer(ctx context.Context) error { } s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager) s.safePointV2Manager = gc.NewSafePointManagerV2(s.ctx, s.storage, s.storage, s.storage) - s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, s.cluster) + s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, "", s.cluster) // initial hot_region_storage in here. s.hotRegionStorage, err = storage.NewHotRegionsStorage( ctx, filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, s.handler) diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index cbb2c2d5f46..5423ac85689 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 + github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index f18313cb3bb..1578b194c3c 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -402,8 +402,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM= -github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs= +github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 50e816cd3ad..2c1dd3b26a1 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -11,8 +11,9 @@ replace ( replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( + github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 + github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 @@ -57,7 +58,6 @@ require ( github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/docker/go-units v0.4.0 // indirect github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 // indirect github.com/elliotchance/pie/v2 v2.1.0 // indirect github.com/fogleman/gg v1.3.0 // indirect diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index 8afbfcdc70e..7260a0b36de 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -407,8 +407,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM= -github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs= +github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index e469c593b84..45c25f01d1e 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -18,9 +18,11 @@ import ( "context" "fmt" "net/http" + "reflect" "testing" "time" + "github.com/docker/go-units" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -287,3 +289,80 @@ func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, }) re.ElementsMatch(evictStoreIDs, expected) } + +func (suite *serverTestSuite) TestForwardRegionHeartbeat() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + s := &server.GrpcServer{Server: suite.pdLeader.GetServer()} + for i := uint64(1); i <= 3; i++ { + resp, err := s.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: i, + Address: fmt.Sprintf("mock://%d", i), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + } + + grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr()) + stream, err := grpcPDClient.RegionHeartbeat(suite.ctx) + re.NoError(err) + peers := []*metapb.Peer{ + {Id: 11, StoreId: 1}, + {Id: 22, StoreId: 2}, + {Id: 33, StoreId: 3}, + } + queryStats := &pdpb.QueryStats{ + Get: 5, + Coprocessor: 6, + Scan: 7, + Put: 8, + Delete: 9, + DeleteRange: 10, + AcquirePessimisticLock: 11, + Rollback: 12, + Prewrite: 13, + Commit: 14, + } + interval := &pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10} + downPeers := []*pdpb.PeerStats{{Peer: peers[2], DownSeconds: 100}} + pendingPeers := []*metapb.Peer{peers[2]} + regionReq := &pdpb.RegionHeartbeatRequest{ + Header: testutil.NewRequestHeader(suite.pdLeader.GetClusterID()), + Region: &metapb.Region{Id: 10, Peers: peers, StartKey: []byte("a"), EndKey: []byte("b")}, + Leader: peers[0], + DownPeers: downPeers, + PendingPeers: pendingPeers, + BytesWritten: 10, + BytesRead: 20, + KeysWritten: 100, + KeysRead: 200, + ApproximateSize: 30 * units.MiB, + ApproximateKeys: 300, + Interval: interval, + QueryStats: queryStats, + Term: 1, + CpuUsage: 100, + } + err = stream.Send(regionReq) + re.NoError(err) + testutil.Eventually(re, func() bool { + region := tc.GetPrimaryServer().GetCluster().GetRegion(10) + return region.GetBytesRead() == 20 && region.GetBytesWritten() == 10 && + region.GetKeysRead() == 200 && region.GetKeysWritten() == 100 && region.GetTerm() == 1 && + region.GetApproximateKeys() == 300 && region.GetApproximateSize() == 30 && + reflect.DeepEqual(region.GetLeader(), peers[0]) && + reflect.DeepEqual(region.GetInterval(), interval) && region.GetReadQueryNum() == 18 && region.GetWriteQueryNum() == 77 && + reflect.DeepEqual(region.GetDownPeers(), downPeers) && reflect.DeepEqual(region.GetPendingPeers(), pendingPeers) + }) +} diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index cea17c73141..1252f121ca5 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 + github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b github.com/stretchr/testify v1.8.4 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 5ccf67f8b13..d417ad89432 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -401,8 +401,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM= -github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs= +github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index da86a872045..cc35d9eaab3 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -137,6 +137,7 @@ func (suite *middlewareTestSuite) TearDownSuite() { func (suite *middlewareTestSuite) TestRequestInfoMiddleware() { suite.NoError(failpoint.Enable("github.com/tikv/pd/server/api/addRequestInfoMiddleware", "return(true)")) leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + suite.NotNil(leader) input := map[string]interface{}{ "enable-audit": "true", @@ -207,6 +208,7 @@ func BenchmarkDoRequestWithServiceMiddleware(b *testing.B) { func (suite *middlewareTestSuite) TestRateLimitMiddleware() { leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + suite.NotNil(leader) input := map[string]interface{}{ "enable-rate-limit": "true", } @@ -371,6 +373,7 @@ func (suite *middlewareTestSuite) TestRateLimitMiddleware() { func (suite *middlewareTestSuite) TestSwaggerUrl() { leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + suite.NotNil(leader) req, _ := http.NewRequest(http.MethodGet, leader.GetAddr()+"/swagger/ui/index", nil) resp, err := dialClient.Do(req) suite.NoError(err) @@ -380,6 +383,7 @@ func (suite *middlewareTestSuite) TestSwaggerUrl() { func (suite *middlewareTestSuite) TestAuditPrometheusBackend() { leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + suite.NotNil(leader) input := map[string]interface{}{ "enable-audit": "true", } @@ -448,6 +452,7 @@ func (suite *middlewareTestSuite) TestAuditLocalLogBackend() { fname := testutil.InitTempFileLogger("info") defer os.RemoveAll(fname) leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + suite.NotNil(leader) input := map[string]interface{}{ "enable-audit": "true", } diff --git a/tools/pd-api-bench/go.mod b/tools/pd-api-bench/go.mod index bb3d5be530f..21471294afb 100644 --- a/tools/pd-api-bench/go.mod +++ b/tools/pd-api-bench/go.mod @@ -70,7 +70,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect - github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 // indirect + github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tools/pd-api-bench/go.sum b/tools/pd-api-bench/go.sum index 29d39f504df..6a133983e28 100644 --- a/tools/pd-api-bench/go.sum +++ b/tools/pd-api-bench/go.sum @@ -263,8 +263,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM= -github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs= +github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=