diff --git a/pkg/core/region.go b/pkg/core/region.go index 4f7af8cc333..244fef836f8 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -2264,13 +2264,3 @@ func NewTestRegionInfo(regionID, storeID uint64, start, end []byte, opts ...Regi } return NewRegionInfo(metaRegion, leader, opts...) } - -// TraverseRegions executes a function on all regions. -// ONLY for simulator now and only for READ. -func (r *RegionsInfo) TraverseRegions(lockedFunc func(*RegionInfo)) { - r.t.RLock() - defer r.t.RUnlock() - for _, item := range r.regions { - lockedFunc(item.RegionInfo) - } -} diff --git a/tools/pd-simulator/main.go b/tools/pd-simulator/main.go index 63ba7f9134d..7fe1d3ecedd 100644 --- a/tools/pd-simulator/main.go +++ b/tools/pd-simulator/main.go @@ -148,6 +148,7 @@ func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimCon } tickInterval := simConfig.SimTickInterval.Duration + ctx, cancel := context.WithCancel(context.Background()) tick := time.NewTicker(tickInterval) defer tick.Stop() sc := make(chan os.Signal, 1) @@ -161,6 +162,10 @@ func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimCon simResult := "FAIL" + go driver.StoresHeartbeat(ctx) + go driver.RegionsHeartbeat(ctx) + go driver.StepRegions(ctx) + EXIT: for { select { @@ -175,6 +180,7 @@ EXIT: } } + cancel() driver.Stop() if len(clean) != 0 && clean[0] != nil { clean[0]() diff --git a/tools/pd-simulator/simulator/cases/cases.go b/tools/pd-simulator/simulator/cases/cases.go index 0ddd66608b1..026e095342b 100644 --- a/tools/pd-simulator/simulator/cases/cases.go +++ b/tools/pd-simulator/simulator/cases/cases.go @@ -101,6 +101,7 @@ var CaseMap = map[string]func(*config.SimConfig) *Case{ "diagnose-label-not-match1": newLabelNotMatch1, "diagnose-label-isolation1": newLabelIsolation1, "diagnose-label-isolation2": newLabelIsolation2, + "stable": newStableEnv, } // NewCase creates a new case. diff --git a/tools/pd-simulator/simulator/cases/stable_env.go b/tools/pd-simulator/simulator/cases/stable_env.go new file mode 100644 index 00000000000..8a015c90ca8 --- /dev/null +++ b/tools/pd-simulator/simulator/cases/stable_env.go @@ -0,0 +1,66 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cases + +import ( + "github.com/docker/go-units" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/core" + sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" + "github.com/tikv/pd/tools/pd-simulator/simulator/info" + "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" +) + +// newStableEnv provides a stable environment for test. +func newStableEnv(config *sc.SimConfig) *Case { + var simCase Case + + totalStore := config.TotalStore + totalRegion := config.TotalRegion + allStores := make(map[uint64]struct{}, totalStore) + arrStoresID := make([]uint64, 0, totalStore) + replica := int(config.ServerConfig.Replication.MaxReplicas) + for i := 0; i < totalStore; i++ { + id := simutil.IDAllocator.NextID() + simCase.Stores = append(simCase.Stores, &Store{ + ID: id, + Status: metapb.StoreState_Up, + }) + allStores[id] = struct{}{} + arrStoresID = append(arrStoresID, id) + } + + for i := 0; i < totalRegion; i++ { + peers := make([]*metapb.Peer, 0, replica) + for j := 0; j < replica; j++ { + peers = append(peers, &metapb.Peer{ + Id: simutil.IDAllocator.NextID(), + StoreId: arrStoresID[(i+j)%totalStore], + }) + } + simCase.Regions = append(simCase.Regions, Region{ + ID: simutil.IDAllocator.NextID(), + Peers: peers, + Leader: peers[0], + Size: 96 * units.MiB, + Keys: 960000, + }) + } + + simCase.Checker = func(_ []*metapb.Store, _ *core.RegionsInfo, _ []info.StoreStats) bool { + return false + } + return &simCase +} diff --git a/tools/pd-simulator/simulator/config/config.go b/tools/pd-simulator/simulator/config/config.go index 6598cf35c0f..030655bd3dc 100644 --- a/tools/pd-simulator/simulator/config/config.go +++ b/tools/pd-simulator/simulator/config/config.go @@ -36,6 +36,7 @@ const ( defaultTotalStore = 3 defaultTotalRegion = 1000 defaultEnableTransferRegionCounter = false + defaultHibernatePercent = 0 // store defaultStoreIOMBPerSecond = 40 defaultStoreHeartbeat = 10 * time.Second @@ -62,6 +63,7 @@ type SimConfig struct { TotalRegion int `toml:"total-region"` EnableTransferRegionCounter bool `toml:"enable-transfer-region-counter"` SimTickInterval typeutil.Duration `toml:"sim-tick-interval"` + HibernatePercent int `toml:"hibernate-percent"` // store StoreIOMBPerSecond int64 `toml:"store-io-per-second"` StoreVersion string `toml:"store-version"` @@ -107,6 +109,7 @@ func (sc *SimConfig) Adjust(meta *toml.MetaData) error { configutil.AdjustDuration(&sc.SimTickInterval, defaultSimTickInterval) configutil.AdjustInt(&sc.TotalStore, defaultTotalStore) configutil.AdjustInt(&sc.TotalRegion, defaultTotalRegion) + configutil.AdjustInt(&sc.HibernatePercent, defaultHibernatePercent) configutil.AdjustBool(&sc.EnableTransferRegionCounter, defaultEnableTransferRegionCounter) configutil.AdjustInt64(&sc.StoreIOMBPerSecond, defaultStoreIOMBPerSecond) configutil.AdjustString(&sc.StoreVersion, versioninfo.PDReleaseVersion) diff --git a/tools/pd-simulator/simulator/drive.go b/tools/pd-simulator/simulator/drive.go index 8c511b5ac5c..87b779559ca 100644 --- a/tools/pd-simulator/simulator/drive.go +++ b/tools/pd-simulator/simulator/drive.go @@ -16,6 +16,7 @@ package simulator import ( "context" + "math/rand" "net/http" "net/http/pprof" "path" @@ -47,12 +48,18 @@ type Driver struct { pdAddr string statusAddress string simCase *cases.Case - tickCount int64 eventRunner *EventRunner raftEngine *RaftEngine conn *Connection simConfig *config.SimConfig pdConfig *config.PDConfig + + tick struct { + count int64 + region chan int64 + store chan int64 + stepRegion chan int64 + } } // NewDriver returns a driver. @@ -64,17 +71,22 @@ func NewDriver(pdAddr, statusAddress, caseName string, simConfig *config.SimConf pdConfig := &config.PDConfig{} pdConfig.PlacementRules = simCase.Rules pdConfig.LocationLabels = simCase.Labels - return &Driver{ + driver := Driver{ pdAddr: pdAddr, statusAddress: statusAddress, simCase: simCase, simConfig: simConfig, pdConfig: pdConfig, - }, nil + } + driver.tick.stepRegion = make(chan int64, 1) + driver.tick.region = make(chan int64, 1) + driver.tick.store = make(chan int64, 1) + return &driver, nil } // Prepare initializes cluster information, bootstraps cluster and starts nodes. func (d *Driver) Prepare() error { + simutil.Logger.Info("prepare cluster") conn, err := NewConnection(d.simCase, d.simConfig) if err != nil { return err @@ -166,15 +178,136 @@ func (d *Driver) updateNodesClient() error { // Tick invokes nodes' Tick. func (d *Driver) Tick() { - d.tickCount++ - d.raftEngine.stepRegions() - d.eventRunner.Tick(d.tickCount) + d.tick.count++ + curTick := d.tick.count + go func() { + d.tick.stepRegion <- curTick + }() + go func() { + d.tick.region <- curTick + }() + go func() { + d.tick.store <- curTick + }() +} + +func (d *Driver) StepRegions(ctx context.Context) { + for { + select { + case tick := <-d.tick.stepRegion: + d.raftEngine.stepRegions() + d.eventRunner.Tick(tick) + for _, n := range d.conn.Nodes { + n.reportRegionChange() + d.wg.Add(1) + go n.Tick(&d.wg) + } + d.wg.Wait() + case <-ctx.Done(): + return + } + } +} + +func (d *Driver) StoresHeartbeat(ctx context.Context) { + config := d.raftEngine.storeConfig + storeInterval := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration) + var wg sync.WaitGroup + for { + select { + case tick := <-d.tick.store: + if uint64(tick)%storeInterval == 0 { + for _, n := range d.conn.Nodes { + wg.Add(1) + go n.storeHeartBeat(&wg) + } + wg.Wait() + } + case <-ctx.Done(): + return + } + } +} + +func (d *Driver) RegionsHeartbeat(ctx context.Context) { + // ensure only wait for the first time heartbeat done + firstReport := true + config := d.raftEngine.storeConfig + regionInterval := uint64(config.RaftStore.RegionHeartBeatInterval.Duration / config.SimTickInterval.Duration) + nodesChannel := make(map[uint64]chan *core.RegionInfo, len(d.conn.Nodes)) for _, n := range d.conn.Nodes { - n.reportRegionChange() - d.wg.Add(1) - go n.Tick(&d.wg) + nodesChannel[n.Store.GetId()] = make(chan *core.RegionInfo, d.simConfig.TotalRegion) + go func(storeID uint64, ch chan *core.RegionInfo) { + for { + select { + case region := <-ch: + d.conn.Nodes[storeID].regionHeartBeat(region) + case <-ctx.Done(): + close(ch) + return + } + } + }(n.Store.GetId(), nodesChannel[n.Store.GetId()]) + } + + for { + select { + case tick := <-d.tick.region: + if uint64(tick)%regionInterval == 0 { + regions := d.raftEngine.GetRegions() + healthyNodes := make(map[uint64]bool) + for _, n := range d.conn.Nodes { + if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving { + healthyNodes[n.Store.GetId()] = false + } else { + healthyNodes[n.Store.GetId()] = true + } + } + report := 0 + for _, region := range regions { + hibernatePercent := d.simConfig.HibernatePercent + // using rand(0,100) to meet hibernatePercent + if !firstReport && rand.Intn(100) < hibernatePercent { + continue + } + + if region.GetLeader() != nil { + storeID := region.GetLeader().GetStoreId() + if healthy, ok := healthyNodes[storeID]; !ok || !healthy { + continue + } + nodesChannel[storeID] <- region.Clone() + report++ + } + } + + // Only set HaltSchedule to false when the leader count is 80% of the total region count. + // using firstReport to avoid the haltSchedule set to true manually. + if HaltSchedule.Load() && firstReport { + storeInterval := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration) + ticker := time.NewTicker(time.Duration(storeInterval)) + for range ticker.C { + // need to wait for first time heartbeat done + stores, _ := PDHTTPClient.GetStores(ctx) + var leaderCount int64 + for _, store := range stores.Stores { + leaderCount += store.Status.LeaderCount + } + // Add halt schedule check to avoid the situation that the leader count is always less than 80%. + if leaderCount > int64(float64(d.simConfig.TotalRegion)*0.8) || !HaltSchedule.Load() { + ChooseToHaltPDSchedule(false) + firstReport = false + ticker.Stop() + simutil.Logger.Info("first region heartbeat done", zap.Int64("leaderCount", leaderCount), zap.Int("checkRegions", len(regions))) + break + } + } + } + } + case <-ctx.Done(): + return + } } - d.wg.Wait() } var HaltSchedule atomic.Bool @@ -197,6 +330,7 @@ func (d *Driver) Check() bool { // Start starts all nodes. func (d *Driver) Start() error { + simutil.Logger.Info("init nodes") if err := d.updateNodesClient(); err != nil { return err } @@ -221,7 +355,7 @@ func (d *Driver) Stop() { // TickCount returns the simulation's tick count. func (d *Driver) TickCount() int64 { - return d.tickCount + return d.tick.count } // GetBootstrapInfo returns a valid bootstrap store and region. diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index 2059107227e..59b0d393c47 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -129,7 +129,6 @@ func (n *Node) Tick(wg *sync.WaitGroup) { if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving { return } - n.stepHeartBeat() n.stepCompaction() n.stepTask() n.tick++ @@ -154,29 +153,14 @@ func (n *Node) stepTask() { } } -var schedulerCheck sync.Once - -func (n *Node) stepHeartBeat() { - config := n.raftEngine.storeConfig - - period := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration) - if n.tick%period == 0 { - n.storeHeartBeat() - } - period = uint64(config.RaftStore.RegionHeartBeatInterval.Duration / config.SimTickInterval.Duration) - if n.tick%period == 0 { - n.regionHeartBeat() - schedulerCheck.Do(func() { ChooseToHaltPDSchedule(false) }) - } -} - func (n *Node) stepCompaction() { if n.tick%compactionDelayPeriod == 0 { n.compaction() } } -func (n *Node) storeHeartBeat() { +func (n *Node) storeHeartBeat(wg *sync.WaitGroup) { + defer wg.Done() if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving { return } @@ -205,26 +189,16 @@ func (n *Node) compaction() { n.stats.ToCompactionSize = 0 } -func (n *Node) regionHeartBeat() { - if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving { - return +func (n *Node) regionHeartBeat(region *core.RegionInfo) { + ctx, cancel := context.WithTimeout(n.ctx, pdTimeout) + err := n.client.RegionHeartbeat(ctx, region) + if err != nil { + simutil.Logger.Info("report region heartbeat error", + zap.Uint64("node-id", n.Id), + zap.Uint64("region-id", region.GetID()), + zap.Error(err)) } - n.raftEngine.TraverseRegions(func(region *core.RegionInfo) { - if region.GetLeader() != nil && region.GetLeader().GetStoreId() == n.Id { - ctx, cancel := context.WithTimeout(n.ctx, pdTimeout) - if region == nil { - simutil.Logger.Fatal("region not found") - } - err := n.client.RegionHeartbeat(ctx, region.Clone()) - if err != nil { - simutil.Logger.Info("report region heartbeat error", - zap.Uint64("node-id", n.Id), - zap.Uint64("region-id", region.GetID()), - zap.Error(err)) - } - cancel() - } - }) + cancel() } func (n *Node) reportRegionChange() { diff --git a/tools/pd-simulator/simulator/raft.go b/tools/pd-simulator/simulator/raft.go index 727cc6ab805..9a219d32f9f 100644 --- a/tools/pd-simulator/simulator/raft.go +++ b/tools/pd-simulator/simulator/raft.go @@ -255,11 +255,6 @@ func (r *RaftEngine) ResetRegionChange(storeID uint64, regionID uint64) { } } -// TraverseRegions executes a function on all regions, and function need to be self-locked. -func (r *RaftEngine) TraverseRegions(lockedFunc func(*core.RegionInfo)) { - r.regionsInfo.TraverseRegions(lockedFunc) -} - // GetRegions gets all RegionInfo from regionMap func (r *RaftEngine) GetRegions() []*core.RegionInfo { r.RLock()