Skip to content

Commit

Permalink
tools/simulator: Make simulator work with large scale cluster (#8269)
Browse files Browse the repository at this point in the history
close #5683, ref #8135

Signed-off-by: husharp <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] committed Jul 26, 2024
1 parent e8c05e0 commit 2baee83
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 63 deletions.
10 changes: 0 additions & 10 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -2264,13 +2264,3 @@ func NewTestRegionInfo(regionID, storeID uint64, start, end []byte, opts ...Regi
}
return NewRegionInfo(metaRegion, leader, opts...)
}

// TraverseRegions executes a function on all regions.
// ONLY for simulator now and only for READ.
func (r *RegionsInfo) TraverseRegions(lockedFunc func(*RegionInfo)) {
r.t.RLock()
defer r.t.RUnlock()
for _, item := range r.regions {
lockedFunc(item.RegionInfo)
}
}
6 changes: 6 additions & 0 deletions tools/pd-simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimCon
}
tickInterval := simConfig.SimTickInterval.Duration

ctx, cancel := context.WithCancel(context.Background())
tick := time.NewTicker(tickInterval)
defer tick.Stop()
sc := make(chan os.Signal, 1)
Expand All @@ -161,6 +162,10 @@ func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimCon

simResult := "FAIL"

go driver.StoresHeartbeat(ctx)
go driver.RegionsHeartbeat(ctx)
go driver.StepRegions(ctx)

EXIT:
for {
select {
Expand All @@ -175,6 +180,7 @@ EXIT:
}
}

cancel()
driver.Stop()
if len(clean) != 0 && clean[0] != nil {
clean[0]()
Expand Down
1 change: 1 addition & 0 deletions tools/pd-simulator/simulator/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ var CaseMap = map[string]func(*config.SimConfig) *Case{
"diagnose-label-not-match1": newLabelNotMatch1,
"diagnose-label-isolation1": newLabelIsolation1,
"diagnose-label-isolation2": newLabelIsolation2,
"stable": newStableEnv,
}

// NewCase creates a new case.
Expand Down
66 changes: 66 additions & 0 deletions tools/pd-simulator/simulator/cases/stable_env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cases

import (
"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
sc "github.com/tikv/pd/tools/pd-simulator/simulator/config"
"github.com/tikv/pd/tools/pd-simulator/simulator/info"
"github.com/tikv/pd/tools/pd-simulator/simulator/simutil"
)

// newStableEnv provides a stable environment for test.
func newStableEnv(config *sc.SimConfig) *Case {
var simCase Case

totalStore := config.TotalStore
totalRegion := config.TotalRegion
allStores := make(map[uint64]struct{}, totalStore)
arrStoresID := make([]uint64, 0, totalStore)
replica := int(config.ServerConfig.Replication.MaxReplicas)
for i := 0; i < totalStore; i++ {
id := simutil.IDAllocator.NextID()
simCase.Stores = append(simCase.Stores, &Store{
ID: id,
Status: metapb.StoreState_Up,
})
allStores[id] = struct{}{}
arrStoresID = append(arrStoresID, id)
}

for i := 0; i < totalRegion; i++ {
peers := make([]*metapb.Peer, 0, replica)
for j := 0; j < replica; j++ {
peers = append(peers, &metapb.Peer{
Id: simutil.IDAllocator.NextID(),
StoreId: arrStoresID[(i+j)%totalStore],
})
}
simCase.Regions = append(simCase.Regions, Region{
ID: simutil.IDAllocator.NextID(),
Peers: peers,
Leader: peers[0],
Size: 96 * units.MiB,
Keys: 960000,
})
}

simCase.Checker = func(_ []*metapb.Store, _ *core.RegionsInfo, _ []info.StoreStats) bool {
return false
}
return &simCase
}
3 changes: 3 additions & 0 deletions tools/pd-simulator/simulator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
defaultTotalStore = 3
defaultTotalRegion = 1000
defaultEnableTransferRegionCounter = false
defaultHibernatePercent = 0
// store
defaultStoreIOMBPerSecond = 40
defaultStoreHeartbeat = 10 * time.Second
Expand All @@ -62,6 +63,7 @@ type SimConfig struct {
TotalRegion int `toml:"total-region"`
EnableTransferRegionCounter bool `toml:"enable-transfer-region-counter"`
SimTickInterval typeutil.Duration `toml:"sim-tick-interval"`
HibernatePercent int `toml:"hibernate-percent"`
// store
StoreIOMBPerSecond int64 `toml:"store-io-per-second"`
StoreVersion string `toml:"store-version"`
Expand Down Expand Up @@ -107,6 +109,7 @@ func (sc *SimConfig) Adjust(meta *toml.MetaData) error {
configutil.AdjustDuration(&sc.SimTickInterval, defaultSimTickInterval)
configutil.AdjustInt(&sc.TotalStore, defaultTotalStore)
configutil.AdjustInt(&sc.TotalRegion, defaultTotalRegion)
configutil.AdjustInt(&sc.HibernatePercent, defaultHibernatePercent)
configutil.AdjustBool(&sc.EnableTransferRegionCounter, defaultEnableTransferRegionCounter)
configutil.AdjustInt64(&sc.StoreIOMBPerSecond, defaultStoreIOMBPerSecond)
configutil.AdjustString(&sc.StoreVersion, versioninfo.PDReleaseVersion)
Expand Down
156 changes: 145 additions & 11 deletions tools/pd-simulator/simulator/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package simulator

import (
"context"
"math/rand"
"net/http"
"net/http/pprof"
"path"
Expand Down Expand Up @@ -47,12 +48,18 @@ type Driver struct {
pdAddr string
statusAddress string
simCase *cases.Case
tickCount int64
eventRunner *EventRunner
raftEngine *RaftEngine
conn *Connection
simConfig *config.SimConfig
pdConfig *config.PDConfig

tick struct {
count int64
region chan int64
store chan int64
stepRegion chan int64
}
}

// NewDriver returns a driver.
Expand All @@ -64,17 +71,22 @@ func NewDriver(pdAddr, statusAddress, caseName string, simConfig *config.SimConf
pdConfig := &config.PDConfig{}
pdConfig.PlacementRules = simCase.Rules
pdConfig.LocationLabels = simCase.Labels
return &Driver{
driver := Driver{
pdAddr: pdAddr,
statusAddress: statusAddress,
simCase: simCase,
simConfig: simConfig,
pdConfig: pdConfig,
}, nil
}
driver.tick.stepRegion = make(chan int64, 1)
driver.tick.region = make(chan int64, 1)
driver.tick.store = make(chan int64, 1)
return &driver, nil
}

// Prepare initializes cluster information, bootstraps cluster and starts nodes.
func (d *Driver) Prepare() error {
simutil.Logger.Info("prepare cluster")
conn, err := NewConnection(d.simCase, d.simConfig)
if err != nil {
return err
Expand Down Expand Up @@ -166,15 +178,136 @@ func (d *Driver) updateNodesClient() error {

// Tick invokes nodes' Tick.
func (d *Driver) Tick() {
d.tickCount++
d.raftEngine.stepRegions()
d.eventRunner.Tick(d.tickCount)
d.tick.count++
curTick := d.tick.count
go func() {
d.tick.stepRegion <- curTick
}()
go func() {
d.tick.region <- curTick
}()
go func() {
d.tick.store <- curTick
}()
}

func (d *Driver) StepRegions(ctx context.Context) {
for {
select {
case tick := <-d.tick.stepRegion:
d.raftEngine.stepRegions()
d.eventRunner.Tick(tick)
for _, n := range d.conn.Nodes {
n.reportRegionChange()
d.wg.Add(1)
go n.Tick(&d.wg)
}
d.wg.Wait()
case <-ctx.Done():
return
}
}
}

func (d *Driver) StoresHeartbeat(ctx context.Context) {
config := d.raftEngine.storeConfig
storeInterval := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration)
var wg sync.WaitGroup
for {
select {
case tick := <-d.tick.store:
if uint64(tick)%storeInterval == 0 {
for _, n := range d.conn.Nodes {
wg.Add(1)
go n.storeHeartBeat(&wg)
}
wg.Wait()
}
case <-ctx.Done():
return
}
}
}

func (d *Driver) RegionsHeartbeat(ctx context.Context) {
// ensure only wait for the first time heartbeat done
firstReport := true
config := d.raftEngine.storeConfig
regionInterval := uint64(config.RaftStore.RegionHeartBeatInterval.Duration / config.SimTickInterval.Duration)
nodesChannel := make(map[uint64]chan *core.RegionInfo, len(d.conn.Nodes))
for _, n := range d.conn.Nodes {
n.reportRegionChange()
d.wg.Add(1)
go n.Tick(&d.wg)
nodesChannel[n.Store.GetId()] = make(chan *core.RegionInfo, d.simConfig.TotalRegion)
go func(storeID uint64, ch chan *core.RegionInfo) {
for {
select {
case region := <-ch:
d.conn.Nodes[storeID].regionHeartBeat(region)
case <-ctx.Done():
close(ch)
return
}
}
}(n.Store.GetId(), nodesChannel[n.Store.GetId()])
}

for {
select {
case tick := <-d.tick.region:
if uint64(tick)%regionInterval == 0 {
regions := d.raftEngine.GetRegions()
healthyNodes := make(map[uint64]bool)
for _, n := range d.conn.Nodes {
if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving {
healthyNodes[n.Store.GetId()] = false
} else {
healthyNodes[n.Store.GetId()] = true
}
}
report := 0
for _, region := range regions {
hibernatePercent := d.simConfig.HibernatePercent
// using rand(0,100) to meet hibernatePercent
if !firstReport && rand.Intn(100) < hibernatePercent {
continue
}

if region.GetLeader() != nil {
storeID := region.GetLeader().GetStoreId()
if healthy, ok := healthyNodes[storeID]; !ok || !healthy {
continue
}
nodesChannel[storeID] <- region.Clone()
report++
}
}

// Only set HaltSchedule to false when the leader count is 80% of the total region count.
// using firstReport to avoid the haltSchedule set to true manually.
if HaltSchedule.Load() && firstReport {
storeInterval := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration)
ticker := time.NewTicker(time.Duration(storeInterval))
for range ticker.C {
// need to wait for first time heartbeat done
stores, _ := PDHTTPClient.GetStores(ctx)
var leaderCount int64
for _, store := range stores.Stores {
leaderCount += store.Status.LeaderCount
}
// Add halt schedule check to avoid the situation that the leader count is always less than 80%.
if leaderCount > int64(float64(d.simConfig.TotalRegion)*0.8) || !HaltSchedule.Load() {
ChooseToHaltPDSchedule(false)
firstReport = false
ticker.Stop()
simutil.Logger.Info("first region heartbeat done", zap.Int64("leaderCount", leaderCount), zap.Int("checkRegions", len(regions)))
break
}
}
}
}
case <-ctx.Done():
return
}
}
d.wg.Wait()
}

var HaltSchedule atomic.Bool
Expand All @@ -197,6 +330,7 @@ func (d *Driver) Check() bool {

// Start starts all nodes.
func (d *Driver) Start() error {
simutil.Logger.Info("init nodes")
if err := d.updateNodesClient(); err != nil {
return err
}
Expand All @@ -221,7 +355,7 @@ func (d *Driver) Stop() {

// TickCount returns the simulation's tick count.
func (d *Driver) TickCount() int64 {
return d.tickCount
return d.tick.count
}

// GetBootstrapInfo returns a valid bootstrap store and region.
Expand Down
Loading

0 comments on commit 2baee83

Please sign in to comment.