diff --git a/client/http/interface.go b/client/http/interface.go index 11c24beaefd..cefdf5e9ff6 100644 --- a/client/http/interface.go +++ b/client/http/interface.go @@ -49,6 +49,7 @@ type Client interface { GetRegionStatusByKeyRange(context.Context, *KeyRange, bool) (*RegionStats, error) GetStores(context.Context) (*StoresInfo, error) GetStore(context.Context, uint64) (*StoreInfo, error) + DeleteStore(context.Context, uint64) error SetStoreLabels(context.Context, int64, map[string]string) error GetHealthStatus(context.Context) ([]Health, error) /* Config-related interfaces */ @@ -440,6 +441,13 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*StoreInfo, erro return &store, nil } +func (c *client) DeleteStore(ctx context.Context, storeID uint64) error { + return c.request(ctx, newRequestInfo(). + WithName(deleteStoreName). + WithURI(StoreByID(storeID)). + WithMethod(http.MethodDelete)) +} + // GetClusterVersion gets the cluster version. func (c *client) GetClusterVersion(ctx context.Context) (string, error) { var version string diff --git a/client/http/request_info.go b/client/http/request_info.go index 3fb91c6ca97..40bd0368250 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -39,6 +39,7 @@ const ( getRegionStatusByKeyRangeName = "GetRegionStatusByKeyRange" getStoresName = "GetStores" getStoreName = "GetStore" + deleteStoreName = "DeleteStore" setStoreLabelsName = "SetStoreLabels" getHealthStatusName = "GetHealthStatus" getConfigName = "GetConfig" diff --git a/conf/simconfig.toml b/conf/simconfig.toml index 428ee61e508..c0edb182652 100644 --- a/conf/simconfig.toml +++ b/conf/simconfig.toml @@ -1,8 +1,10 @@ # PD Simulator Configuration -[tick] ## the tick interval when starting PD inside (default: "100ms") sim-tick-interval = "100ms" +total-store = 10 +total-region = 10000 +case-name = "balance-leader" [store] ## the capacity size of a new store in GB (default: 1024) @@ -11,8 +13,8 @@ store-capacity = 1024 store-available = 1024 ## the io rate of a new store in MB/s (default: 40) store-io-per-second = 40 -## the version of a new store (default: "2.1.0") -store-version = "2.1.0" +## the version of a new store (default: "8.1.0") +store-version = "8.1.0" ## the meaning of these configurations below are similar with config.toml [server] diff --git a/tools/pd-simulator/main.go b/tools/pd-simulator/main.go index 45b3ecd75c9..2e3e12c60d9 100644 --- a/tools/pd-simulator/main.go +++ b/tools/pd-simulator/main.go @@ -17,6 +17,7 @@ package main import ( "context" "fmt" + pdHttp "github.com/tikv/pd/client/http" "os" "os/signal" "syscall" @@ -92,6 +93,7 @@ func main() { func run(simCase string, simConfig *sc.SimConfig) { if *pdAddr != "" { + simulator.PDHTTPClient = pdHttp.NewClient("pd-simulator", []string{*pdAddr}) simStart(*pdAddr, *statusAddress, simCase, simConfig) } else { local, clean := NewSingleServer(context.Background(), simConfig) @@ -105,6 +107,7 @@ func run(simCase string, simConfig *sc.SimConfig) { } time.Sleep(100 * time.Millisecond) } + simulator.PDHTTPClient = pdHttp.NewClient("pd-simulator", []string{local.GetAddr()}) simStart(local.GetAddr(), "", simCase, simConfig, clean) } } diff --git a/tools/pd-simulator/simulator/cases/balance_leader.go b/tools/pd-simulator/simulator/cases/balance_leader.go index fd9028bc91a..1dad09850a5 100644 --- a/tools/pd-simulator/simulator/cases/balance_leader.go +++ b/tools/pd-simulator/simulator/cases/balance_leader.go @@ -28,12 +28,15 @@ func newBalanceLeader(config *sc.SimConfig) *Case { totalStore := config.TotalStore totalRegion := config.TotalRegion + allStores := make(map[uint64]struct{}, totalStore) replica := int(config.ServerConfig.Replication.MaxReplicas) for i := 0; i < totalStore; i++ { + id := simutil.IDAllocator.NextID() simCase.Stores = append(simCase.Stores, &Store{ - ID: simutil.IDAllocator.NextID(), + ID: id, Status: metapb.StoreState_Up, }) + allStores[id] = struct{}{} } leaderStoreID := simCase.Stores[totalStore-1].ID @@ -58,10 +61,18 @@ func newBalanceLeader(config *sc.SimConfig) *Case { }) } - simCase.Checker = func(regions *core.RegionsInfo, _ []info.StoreStats) bool { - for i := 1; i <= totalStore; i++ { - leaderCount := regions.GetStoreLeaderCount(uint64(i)) - if !isUniform(leaderCount, totalRegion/totalStore) { + simCase.Checker = func(stores []*metapb.Store, regions *core.RegionsInfo, _ []info.StoreStats) bool { + for _, store := range stores { + if store.NodeState == metapb.NodeState_Removed { + delete(allStores, store.GetId()) + } + } + if len(allStores) == 0 { + return false + } + for storeID := range allStores { + leaderCount := regions.GetStoreLeaderCount(storeID) + if !isUniform(leaderCount, totalRegion/len(allStores)) { return false } } diff --git a/tools/pd-simulator/simulator/cases/balance_region.go b/tools/pd-simulator/simulator/cases/balance_region.go index 82a7ac2d704..ddb6c6f4cae 100644 --- a/tools/pd-simulator/simulator/cases/balance_region.go +++ b/tools/pd-simulator/simulator/cases/balance_region.go @@ -30,6 +30,7 @@ func newRedundantBalanceRegion(config *sc.SimConfig) *Case { totalStore := config.TotalStore totalRegion := config.TotalRegion replica := int(config.ServerConfig.Replication.MaxReplicas) + allStores := make(map[uint64]struct{}, totalStore) for i := 0; i < totalStore; i++ { s := &Store{ @@ -40,6 +41,7 @@ func newRedundantBalanceRegion(config *sc.SimConfig) *Case { s.HasExtraUsedSpace = true } simCase.Stores = append(simCase.Stores, s) + allStores[s.ID] = struct{}{} } for i := 0; i < totalRegion; i++ { @@ -57,21 +59,27 @@ func newRedundantBalanceRegion(config *sc.SimConfig) *Case { }) } - storesLastUpdateTime := make([]int64, totalStore+1) - storeLastAvailable := make([]uint64, totalStore+1) - simCase.Checker = func(_ *core.RegionsInfo, stats []info.StoreStats) bool { + storesLastUpdateTime := make(map[uint64]int64, totalStore) + storeLastAvailable := make(map[uint64]uint64, totalStore) + simCase.Checker = func(stores []*metapb.Store, regions *core.RegionsInfo, stats []info.StoreStats) bool { + for _, store := range stores { + if store.NodeState == metapb.NodeState_Removed { + delete(allStores, store.GetId()) + } + } + curTime := time.Now().Unix() - for i := 1; i <= totalStore; i++ { - available := stats[i].GetAvailable() - if curTime-storesLastUpdateTime[i] > 60 { - if storeLastAvailable[i] != available { + for storeID := range allStores { + available := stats[storeID].GetAvailable() + if curTime-storesLastUpdateTime[storeID] > 60 { + if storeLastAvailable[storeID] != available { return false } - if stats[i].ToCompactionSize != 0 { + if stats[storeID].ToCompactionSize != 0 { return false } - storesLastUpdateTime[i] = curTime - storeLastAvailable[i] = available + storesLastUpdateTime[storeID] = curTime + storeLastAvailable[storeID] = available } else { return false } diff --git a/tools/pd-simulator/simulator/cases/cases.go b/tools/pd-simulator/simulator/cases/cases.go index 00b5404669f..238b54c935a 100644 --- a/tools/pd-simulator/simulator/cases/cases.go +++ b/tools/pd-simulator/simulator/cases/cases.go @@ -16,8 +16,8 @@ package cases import ( "github.com/pingcap/kvproto/pkg/metapb" + pdHttp "github.com/tikv/pd/client/http" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/tools/pd-simulator/simulator/config" "github.com/tikv/pd/tools/pd-simulator/simulator/info" @@ -45,7 +45,7 @@ type Region struct { } // CheckerFunc checks if the scheduler is finished. -type CheckerFunc func(*core.RegionsInfo, []info.StoreStats) bool +type CheckerFunc func([]*metapb.Store, *core.RegionsInfo, []info.StoreStats) bool // Case represents a test suite for simulator. type Case struct { @@ -57,7 +57,7 @@ type Case struct { TableNumber int Checker CheckerFunc // To check the schedule is finished. - Rules []*placement.Rule + Rules []*pdHttp.Rule Labels typeutil.StringSlice } diff --git a/tools/pd-simulator/simulator/cases/diagnose_label_isolation.go b/tools/pd-simulator/simulator/cases/diagnose_label_isolation.go index 09037136608..2be71b9ed64 100644 --- a/tools/pd-simulator/simulator/cases/diagnose_label_isolation.go +++ b/tools/pd-simulator/simulator/cases/diagnose_label_isolation.go @@ -33,6 +33,7 @@ func newLabelNotMatch1(_ *sc.SimConfig) *Case { num1, num2 := 3, 1 storeNum, regionNum := num1+num2, 200 + allStores := make(map[uint64]struct{}, storeNum+1) for i := 0; i < num1; i++ { id := IDAllocator.nextID() simCase.Stores = append(simCase.Stores, &Store{ @@ -40,11 +41,14 @@ func newLabelNotMatch1(_ *sc.SimConfig) *Case { Status: metapb.StoreState_Up, Labels: []*metapb.StoreLabel{{Key: "host", Value: fmt.Sprintf("host%d", id)}}, }) + allStores[id] = struct{}{} } + id := IDAllocator.nextID() simCase.Stores = append(simCase.Stores, &Store{ - ID: IDAllocator.nextID(), + ID: id, Status: metapb.StoreState_Up, }) + allStores[id] = struct{}{} for i := 0; i < regionNum; i++ { peers := []*metapb.Peer{ @@ -61,24 +65,30 @@ func newLabelNotMatch1(_ *sc.SimConfig) *Case { }) } - storesLastUpdateTime := make([]int64, storeNum+1) - storeLastAvailable := make([]uint64, storeNum+1) - simCase.Checker = func(_ *core.RegionsInfo, stats []info.StoreStats) bool { + storesLastUpdateTime := make(map[uint64]int64, storeNum+1) + storeLastAvailable := make(map[uint64]uint64, storeNum+1) + simCase.Checker = func(stores []*metapb.Store, regions *core.RegionsInfo, stats []info.StoreStats) bool { + for _, store := range stores { + if store.NodeState == metapb.NodeState_Removed { + delete(allStores, store.GetId()) + } + } + res := true curTime := time.Now().Unix() storesAvailable := make([]uint64, 0, storeNum+1) - for i := 1; i <= storeNum; i++ { - available := stats[i].GetAvailable() + for storeID := range allStores { + available := stats[storeID].GetAvailable() storesAvailable = append(storesAvailable, available) - if curTime-storesLastUpdateTime[i] > 360 { - if storeLastAvailable[i] != available { + if curTime-storesLastUpdateTime[storeID] > 360 { + if storeLastAvailable[storeID] != available { res = false } - if stats[i].ToCompactionSize != 0 { + if stats[storeID].ToCompactionSize != 0 { res = false } - storesLastUpdateTime[i] = curTime - storeLastAvailable[i] = available + storesLastUpdateTime[storeID] = curTime + storeLastAvailable[storeID] = available } else { res = false } @@ -95,6 +105,7 @@ func newLabelIsolation1(_ *sc.SimConfig) *Case { num1, num2 := 2, 2 storeNum, regionNum := num1+num2, 300 + allStores := make(map[uint64]struct{}, storeNum+1) for i := 0; i < num1; i++ { id := IDAllocator.nextID() simCase.Stores = append(simCase.Stores, &Store{ @@ -102,14 +113,16 @@ func newLabelIsolation1(_ *sc.SimConfig) *Case { Status: metapb.StoreState_Up, Labels: []*metapb.StoreLabel{{Key: "host", Value: fmt.Sprintf("host%d", id)}}, }) + allStores[id] = struct{}{} } - id := IDAllocator.GetID() + 1 for i := 0; i < num2; i++ { + id := IDAllocator.nextID() simCase.Stores = append(simCase.Stores, &Store{ - ID: IDAllocator.nextID(), + ID: id, Status: metapb.StoreState_Up, Labels: []*metapb.StoreLabel{{Key: "host", Value: fmt.Sprintf("host%d", id)}}, }) + allStores[id] = struct{}{} } for i := 0; i < regionNum; i++ { @@ -127,24 +140,30 @@ func newLabelIsolation1(_ *sc.SimConfig) *Case { }) } - storesLastUpdateTime := make([]int64, storeNum+1) - storeLastAvailable := make([]uint64, storeNum+1) - simCase.Checker = func(_ *core.RegionsInfo, stats []info.StoreStats) bool { + storesLastUpdateTime := make(map[uint64]int64, storeNum) + storeLastAvailable := make(map[uint64]uint64, storeNum) + simCase.Checker = func(stores []*metapb.Store, regions *core.RegionsInfo, stats []info.StoreStats) bool { + for _, store := range stores { + if store.NodeState == metapb.NodeState_Removed { + delete(allStores, store.GetId()) + } + } + res := true curTime := time.Now().Unix() storesAvailable := make([]uint64, 0, storeNum+1) - for i := 1; i <= storeNum; i++ { - available := stats[i].GetAvailable() + for storeID := range allStores { + available := stats[storeID].GetAvailable() storesAvailable = append(storesAvailable, available) - if curTime-storesLastUpdateTime[i] > 360 { - if storeLastAvailable[i] != available { + if curTime-storesLastUpdateTime[storeID] > 360 { + if storeLastAvailable[storeID] != available { res = false } - if stats[i].ToCompactionSize != 0 { + if stats[storeID].ToCompactionSize != 0 { res = false } - storesLastUpdateTime[i] = curTime - storeLastAvailable[i] = available + storesLastUpdateTime[storeID] = curTime + storeLastAvailable[storeID] = available } else { res = false } @@ -160,12 +179,14 @@ func newLabelIsolation2(_ *sc.SimConfig) *Case { simCase.Labels = []string{"dc", "zone", "host"} storeNum, regionNum := 5, 200 + allStores := make(map[uint64]struct{}, storeNum) for i := 0; i < storeNum; i++ { id := IDAllocator.nextID() simCase.Stores = append(simCase.Stores, &Store{ ID: id, Status: metapb.StoreState_Up, }) + allStores[id] = struct{}{} } simCase.Stores[0].Labels = []*metapb.StoreLabel{{Key: "dc", Value: "dc1"}, {Key: "zone", Value: "zone1"}, {Key: "host", Value: "host1"}} simCase.Stores[1].Labels = []*metapb.StoreLabel{{Key: "dc", Value: "dc1"}, {Key: "zone", Value: "zone1"}, {Key: "host", Value: "host2"}} @@ -188,24 +209,30 @@ func newLabelIsolation2(_ *sc.SimConfig) *Case { }) } - storesLastUpdateTime := make([]int64, storeNum+1) - storeLastAvailable := make([]uint64, storeNum+1) - simCase.Checker = func(_ *core.RegionsInfo, stats []info.StoreStats) bool { + storesLastUpdateTime := make(map[uint64]int64, storeNum) + storeLastAvailable := make(map[uint64]uint64, storeNum) + simCase.Checker = func(stores []*metapb.Store, regions *core.RegionsInfo, stats []info.StoreStats) bool { + for _, store := range stores { + if store.NodeState == metapb.NodeState_Removed { + delete(allStores, store.GetId()) + } + } + res := true curTime := time.Now().Unix() storesAvailable := make([]uint64, 0, storeNum+1) - for i := 1; i <= storeNum; i++ { - available := stats[i].GetAvailable() + for storeID := range allStores { + available := stats[storeID].GetAvailable() storesAvailable = append(storesAvailable, available) - if curTime-storesLastUpdateTime[i] > 360 { - if storeLastAvailable[i] != available { + if curTime-storesLastUpdateTime[storeID] > 360 { + if storeLastAvailable[storeID] != available { res = false } - if stats[i].ToCompactionSize != 0 { + if stats[storeID].ToCompactionSize != 0 { res = false } - storesLastUpdateTime[i] = curTime - storeLastAvailable[i] = available + storesLastUpdateTime[storeID] = curTime + storeLastAvailable[storeID] = available } else { res = false } diff --git a/tools/pd-simulator/simulator/cases/diagnose_rule.go b/tools/pd-simulator/simulator/cases/diagnose_rule.go index 5d34e051071..3a2314be713 100644 --- a/tools/pd-simulator/simulator/cases/diagnose_rule.go +++ b/tools/pd-simulator/simulator/cases/diagnose_rule.go @@ -15,6 +15,7 @@ package cases import ( + pdHttp "github.com/tikv/pd/client/http" "time" "github.com/docker/go-units" @@ -30,15 +31,15 @@ import ( func newRule1(_ *sc.SimConfig) *Case { var simCase Case - simCase.Rules = make([]*placement.Rule, 0) - simCase.Rules = append(simCase.Rules, &placement.Rule{ + simCase.Rules = make([]*pdHttp.Rule, 0) + simCase.Rules = append(simCase.Rules, &pdHttp.Rule{ GroupID: "test1", ID: "test1", StartKeyHex: "", EndKeyHex: "", - Role: placement.Learner, + Role: pdHttp.Learner, Count: 1, - LabelConstraints: []placement.LabelConstraint{ + LabelConstraints: []pdHttp.LabelConstraint{ { Key: "region", Op: "in", @@ -46,14 +47,14 @@ func newRule1(_ *sc.SimConfig) *Case { }, }, LocationLabels: []string{"host"}, - }, &placement.Rule{ + }, &pdHttp.Rule{ GroupID: placement.DefaultGroupID, ID: placement.DefaultRuleID, StartKeyHex: "", EndKeyHex: "", - Role: placement.Voter, + Role: pdHttp.Voter, Count: 5, - LabelConstraints: []placement.LabelConstraint{ + LabelConstraints: []pdHttp.LabelConstraint{ { Key: "region", Op: "in", @@ -64,12 +65,14 @@ func newRule1(_ *sc.SimConfig) *Case { }) storeNum, regionNum := 9, 300 + allStores := make(map[uint64]struct{}, storeNum) for i := 0; i < storeNum; i++ { id := IDAllocator.nextID() simCase.Stores = append(simCase.Stores, &Store{ ID: id, Status: metapb.StoreState_Up, }) + allStores[id] = struct{}{} } simCase.Stores[0].Labels = []*metapb.StoreLabel{{Key: "region", Value: "region2"}, {Key: "idc", Value: "idc1"}} simCase.Stores[1].Labels = []*metapb.StoreLabel{{Key: "region", Value: "region2"}, {Key: "idc", Value: "idc1"}} @@ -99,24 +102,30 @@ func newRule1(_ *sc.SimConfig) *Case { }) } - storesLastUpdateTime := make([]int64, storeNum+1) - storeLastAvailable := make([]uint64, storeNum+1) - simCase.Checker = func(_ *core.RegionsInfo, stats []info.StoreStats) bool { + storesLastUpdateTime := make(map[uint64]int64, storeNum) + storeLastAvailable := make(map[uint64]uint64, storeNum) + simCase.Checker = func(stores []*metapb.Store, regions *core.RegionsInfo, stats []info.StoreStats) bool { + for _, store := range stores { + if store.NodeState == metapb.NodeState_Removed { + delete(allStores, store.GetId()) + } + } + res := true curTime := time.Now().Unix() storesAvailable := make([]uint64, 0, storeNum+1) - for i := 1; i <= storeNum; i++ { - available := stats[i].GetAvailable() + for storeID := range allStores { + available := stats[storeID].GetAvailable() storesAvailable = append(storesAvailable, available) - if curTime-storesLastUpdateTime[i] > 360 { - if storeLastAvailable[i] != available { + if curTime-storesLastUpdateTime[storeID] > 360 { + if storeLastAvailable[storeID] != available { res = false } - if stats[i].ToCompactionSize != 0 { + if stats[storeID].ToCompactionSize != 0 { res = false } - storesLastUpdateTime[i] = curTime - storeLastAvailable[i] = available + storesLastUpdateTime[storeID] = curTime + storeLastAvailable[storeID] = available } else { res = false } @@ -130,16 +139,16 @@ func newRule1(_ *sc.SimConfig) *Case { func newRule2(_ *sc.SimConfig) *Case { var simCase Case - simCase.Rules = make([]*placement.Rule, 0) + simCase.Rules = make([]*pdHttp.Rule, 0) simCase.Rules = append(simCase.Rules, - &placement.Rule{ + &pdHttp.Rule{ GroupID: "test1", ID: "test1", StartKeyHex: "", EndKeyHex: "", - Role: placement.Leader, + Role: pdHttp.Leader, Count: 1, - LabelConstraints: []placement.LabelConstraint{ + LabelConstraints: []pdHttp.LabelConstraint{ { Key: "region", Op: "in", @@ -149,12 +158,14 @@ func newRule2(_ *sc.SimConfig) *Case { }) storeNum, regionNum := 6, 300 + allStores := make(map[uint64]struct{}, storeNum) for i := 0; i < storeNum; i++ { id := IDAllocator.nextID() simCase.Stores = append(simCase.Stores, &Store{ ID: id, Status: metapb.StoreState_Up, }) + allStores[id] = struct{}{} } simCase.Stores[0].Labels = []*metapb.StoreLabel{{Key: "region", Value: "region1"}} simCase.Stores[1].Labels = []*metapb.StoreLabel{{Key: "region", Value: "region1"}} @@ -180,22 +191,28 @@ func newRule2(_ *sc.SimConfig) *Case { storesLastUpdateTime := make([]int64, storeNum+1) storeLastAvailable := make([]uint64, storeNum+1) - simCase.Checker = func(_ *core.RegionsInfo, stats []info.StoreStats) bool { + simCase.Checker = func(stores []*metapb.Store, regions *core.RegionsInfo, stats []info.StoreStats) bool { + for _, store := range stores { + if store.NodeState == metapb.NodeState_Removed { + delete(allStores, store.GetId()) + } + } + res := true curTime := time.Now().Unix() storesAvailable := make([]uint64, 0, storeNum+1) - for i := 1; i <= storeNum; i++ { - available := stats[i].GetAvailable() + for storeID := range allStores { + available := stats[storeID].GetAvailable() storesAvailable = append(storesAvailable, available) - if curTime-storesLastUpdateTime[i] > 360 { - if storeLastAvailable[i] != available { + if curTime-storesLastUpdateTime[storeID] > 360 { + if storeLastAvailable[storeID] != available { res = false } - if stats[i].ToCompactionSize != 0 { + if stats[storeID].ToCompactionSize != 0 { res = false } - storesLastUpdateTime[i] = curTime - storeLastAvailable[i] = available + storesLastUpdateTime[storeID] = curTime + storeLastAvailable[storeID] = available } else { res = false } diff --git a/tools/pd-simulator/simulator/cases/hot_read.go b/tools/pd-simulator/simulator/cases/hot_read.go index d154886b0a4..644a009bb92 100644 --- a/tools/pd-simulator/simulator/cases/hot_read.go +++ b/tools/pd-simulator/simulator/cases/hot_read.go @@ -15,6 +15,7 @@ package cases import ( + "fmt" "github.com/docker/go-units" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" @@ -23,18 +24,22 @@ import ( "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" ) +var hotReadStore uint64 = 1 + func newHotRead(config *sc.SimConfig) *Case { var simCase Case totalStore := config.TotalStore totalRegion := config.TotalRegion replica := int(config.ServerConfig.Replication.MaxReplicas) - + allStores := make(map[uint64]struct{}, totalStore) // Initialize the cluster for i := 0; i < totalStore; i++ { + id := simutil.IDAllocator.NextID() simCase.Stores = append(simCase.Stores, &Store{ - ID: simutil.IDAllocator.NextID(), + ID: id, Status: metapb.StoreState_Up, }) + allStores[id] = struct{}{} } for i := 0; i < totalRegion; i++ { @@ -54,12 +59,18 @@ func newHotRead(config *sc.SimConfig) *Case { }) } + // select the first store as hot read store + for store := range allStores { + hotReadStore = store + break + } + // Events description - // select regions on store 1 as hot read regions. + // select regions on `hotReadStore` as hot read regions. selectRegionNum := 4 * totalStore readFlow := make(map[uint64]int64, selectRegionNum) for _, r := range simCase.Regions { - if r.Leader.GetStoreId() == 1 { + if r.Leader.GetStoreId() == hotReadStore { readFlow[r.ID] = 128 * units.MiB if len(readFlow) == selectRegionNum { break @@ -72,15 +83,25 @@ func newHotRead(config *sc.SimConfig) *Case { } simCase.Events = []EventDescriptor{e} // Checker description - simCase.Checker = func(regions *core.RegionsInfo, _ []info.StoreStats) bool { - leaderCount := make([]int, totalStore) + simCase.Checker = func(stores []*metapb.Store, regions *core.RegionsInfo, stats []info.StoreStats) bool { + for _, store := range stores { + if store.NodeState == metapb.NodeState_Removed { + if store.Id == hotReadStore { + simutil.Logger.Error(fmt.Sprintf("hot store %d is removed", hotReadStore)) + return true + } + delete(allStores, store.GetId()) + } + } + + leaderCount := make(map[uint64]int, len(allStores)) for id := range readFlow { leaderStore := regions.GetRegion(id).GetLeader().GetStoreId() - leaderCount[int(leaderStore-1)]++ + leaderCount[leaderStore]++ } // check count diff < 2. - var min, max int + var min, max uint64 for i := range leaderCount { if leaderCount[i] > leaderCount[max] { max = i diff --git a/tools/pd-simulator/simulator/cases/hot_write.go b/tools/pd-simulator/simulator/cases/hot_write.go index e73ca6f3ce3..54f859994ba 100644 --- a/tools/pd-simulator/simulator/cases/hot_write.go +++ b/tools/pd-simulator/simulator/cases/hot_write.go @@ -15,6 +15,7 @@ package cases import ( + "fmt" "github.com/docker/go-units" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" @@ -23,18 +24,22 @@ import ( "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" ) +var hotWriteStore uint64 = 1 + func newHotWrite(config *sc.SimConfig) *Case { var simCase Case totalStore := config.TotalStore totalRegion := config.TotalRegion replica := int(config.ServerConfig.Replication.MaxReplicas) - + allStores := make(map[uint64]struct{}, totalStore) // Initialize the cluster for i := 0; i < totalStore; i++ { + id := simutil.IDAllocator.NextID() simCase.Stores = append(simCase.Stores, &Store{ - ID: simutil.IDAllocator.NextID(), + ID: id, Status: metapb.StoreState_Up, }) + allStores[id] = struct{}{} } for i := 0; i < totalRegion; i++ { @@ -54,14 +59,20 @@ func newHotWrite(config *sc.SimConfig) *Case { }) } + // select the first store as hot write store. + for store := range allStores { + hotWriteStore = store + break + } + // Events description - // select regions on store 1 as hot write regions. - selectStoreNum := totalStore - writeFlow := make(map[uint64]int64, selectStoreNum) + // select regions on `hotWriteStore` as hot write regions. + selectRegionNum := totalStore + writeFlow := make(map[uint64]int64, selectRegionNum) for _, r := range simCase.Regions { - if r.Leader.GetStoreId() == 1 { + if r.Leader.GetStoreId() == hotWriteStore { writeFlow[r.ID] = 2 * units.MiB - if len(writeFlow) == selectStoreNum { + if len(writeFlow) == selectRegionNum { break } } @@ -70,23 +81,31 @@ func newHotWrite(config *sc.SimConfig) *Case { e.Step = func(int64) map[uint64]int64 { return writeFlow } - simCase.Events = []EventDescriptor{e} - // Checker description - simCase.Checker = func(regions *core.RegionsInfo, _ []info.StoreStats) bool { - leaderCount := make([]int, totalStore) - peerCount := make([]int, totalStore) + simCase.Checker = func(stores []*metapb.Store, regions *core.RegionsInfo, stats []info.StoreStats) bool { + for _, store := range stores { + if store.NodeState == metapb.NodeState_Removed { + if store.Id == hotWriteStore { + simutil.Logger.Error(fmt.Sprintf("hot store %d is removed", hotReadStore)) + return true + } + delete(allStores, store.GetId()) + } + } + + leaderCount := make(map[uint64]int, len(allStores)) + peerCount := make(map[uint64]int, totalStore) for id := range writeFlow { region := regions.GetRegion(id) - leaderCount[int(region.GetLeader().GetStoreId()-1)]++ + leaderCount[region.GetLeader().GetStoreId()]++ for _, p := range region.GetPeers() { - peerCount[int(p.GetStoreId()-1)]++ + peerCount[p.GetStoreId()]++ } } // check count diff <= 2. - var minLeader, maxLeader, minPeer, maxPeer int + var minLeader, maxLeader, minPeer, maxPeer uint64 for i := range leaderCount { if leaderCount[i] > leaderCount[maxLeader] { maxLeader = i diff --git a/tools/pd-simulator/simulator/cases/import_data.go b/tools/pd-simulator/simulator/cases/import_data.go index b9f448a6cf6..d9bfb4be6b0 100644 --- a/tools/pd-simulator/simulator/cases/import_data.go +++ b/tools/pd-simulator/simulator/cases/import_data.go @@ -36,13 +36,15 @@ func newImportData(config *sc.SimConfig) *Case { totalStore := config.TotalStore totalRegion := config.TotalRegion replica := int(config.ServerConfig.Replication.MaxReplicas) - + allStores := make(map[uint64]struct{}, totalStore) // Initialize the cluster for i := 0; i < totalStore; i++ { + id := simutil.IDAllocator.NextID() simCase.Stores = append(simCase.Stores, &Store{ - ID: IDAllocator.nextID(), + ID: id, Status: metapb.StoreState_Up, }) + allStores[id] = struct{}{} } for i := 0; i < totalRegion; i++ { @@ -83,7 +85,13 @@ func newImportData(config *sc.SimConfig) *Case { checkCount := uint64(0) var newRegionCount [][3]int var allRegionCount [][3]int - simCase.Checker = func(regions *core.RegionsInfo, _ []info.StoreStats) bool { + simCase.Checker = func(stores []*metapb.Store, regions *core.RegionsInfo, stats []info.StoreStats) bool { + for _, store := range stores { + if store.NodeState == metapb.NodeState_Removed { + delete(allStores, store.GetId()) + } + } + leaderDist := make(map[uint64]int) peerDist := make(map[uint64]int) leaderTotal := 0 @@ -115,9 +123,9 @@ func newImportData(config *sc.SimConfig) *Case { tableLeaderLog = fmt.Sprintf("%s [store %d]:%.2f%%", tableLeaderLog, storeID, float64(leaderCount)/float64(leaderTotal)*100) } } - for storeID := 1; storeID <= 10; storeID++ { - if peerCount, ok := peerDist[uint64(storeID)]; ok { - newRegionCount = append(newRegionCount, [3]int{storeID, int(checkCount), peerCount}) + for storeID := range allStores { + if peerCount, ok := peerDist[storeID]; ok { + newRegionCount = append(newRegionCount, [3]int{int(storeID), int(checkCount), peerCount}) tablePeerLog = fmt.Sprintf("%s [store %d]:%.2f%%", tablePeerLog, storeID, float64(peerCount)/float64(peerTotal)*100) } } @@ -126,7 +134,7 @@ func newImportData(config *sc.SimConfig) *Case { totalPeerLog := fmt.Sprintf("%d peer:", regionTotal*3) isEnd := false var regionProps []float64 - for storeID := uint64(1); storeID <= 10; storeID++ { + for storeID := range allStores { totalLeaderLog = fmt.Sprintf("%s [store %d]:%.2f%%", totalLeaderLog, storeID, float64(regions.GetStoreLeaderCount(storeID))/float64(regionTotal)*100) regionProp := float64(regions.GetStoreRegionCount(storeID)) / float64(regionTotal*3) * 100 regionProps = append(regionProps, regionProp) diff --git a/tools/pd-simulator/simulator/cases/makeup_down_replica.go b/tools/pd-simulator/simulator/cases/makeup_down_replica.go index a5ee63e71a0..3518509b0c4 100644 --- a/tools/pd-simulator/simulator/cases/makeup_down_replica.go +++ b/tools/pd-simulator/simulator/cases/makeup_down_replica.go @@ -69,7 +69,7 @@ func newMakeupDownReplicas(config *sc.SimConfig) *Case { } simCase.Events = []EventDescriptor{e} - simCase.Checker = func(regions *core.RegionsInfo, _ []info.StoreStats) bool { + simCase.Checker = func(_ []*metapb.Store, regions *core.RegionsInfo, stats []info.StoreStats) bool { if !down { return false } diff --git a/tools/pd-simulator/simulator/cases/region_merge.go b/tools/pd-simulator/simulator/cases/region_merge.go index 8097565d1a7..32fb2f2b6f4 100644 --- a/tools/pd-simulator/simulator/cases/region_merge.go +++ b/tools/pd-simulator/simulator/cases/region_merge.go @@ -28,12 +28,15 @@ func newRegionMerge(config *sc.SimConfig) *Case { totalStore := config.TotalStore totalRegion := config.TotalRegion replica := int(config.ServerConfig.Replication.MaxReplicas) + allStores := make(map[uint64]struct{}, totalStore) for i := 0; i < totalStore; i++ { + id := simutil.IDAllocator.NextID() simCase.Stores = append(simCase.Stores, &Store{ - ID: simutil.IDAllocator.NextID(), + ID: id, Status: metapb.StoreState_Up, }) + allStores[id] = struct{}{} } for i := 0; i < totalRegion; i++ { @@ -54,10 +57,16 @@ func newRegionMerge(config *sc.SimConfig) *Case { } // Checker description mergeRatio := 4 // when max-merge-region-size is 20, per region will reach 40MB - simCase.Checker = func(regions *core.RegionsInfo, _ []info.StoreStats) bool { + simCase.Checker = func(stores []*metapb.Store, regions *core.RegionsInfo, stats []info.StoreStats) bool { + for _, store := range stores { + if store.NodeState == metapb.NodeState_Removed { + delete(allStores, store.GetId()) + } + } + currentPeerCount := 0 - for i := 1; i <= totalStore; i++ { - currentPeerCount += regions.GetStoreRegionCount(uint64(i)) + for storeID := range allStores { + currentPeerCount += regions.GetStoreRegionCount(storeID) } return isUniform(currentPeerCount, totalRegion*replica/mergeRatio) } diff --git a/tools/pd-simulator/simulator/cases/region_split.go b/tools/pd-simulator/simulator/cases/region_split.go index 7b712f4dc48..173a54e82cc 100644 --- a/tools/pd-simulator/simulator/cases/region_split.go +++ b/tools/pd-simulator/simulator/cases/region_split.go @@ -25,12 +25,15 @@ import ( func newRegionSplit(config *sc.SimConfig) *Case { var simCase Case totalStore := config.TotalStore + allStores := make(map[uint64]struct{}, totalStore) for i := 0; i < totalStore; i++ { + id := uint64(i) simCase.Stores = append(simCase.Stores, &Store{ - ID: uint64(i), + ID: id, Status: metapb.StoreState_Up, }) + allStores[id] = struct{}{} } peers := []*metapb.Peer{ {Id: 4, StoreId: 1}, @@ -55,9 +58,9 @@ func newRegionSplit(config *sc.SimConfig) *Case { simCase.Events = []EventDescriptor{e} // Checker description - simCase.Checker = func(regions *core.RegionsInfo, _ []info.StoreStats) bool { - for i := 1; i <= totalStore; i++ { - peerCount := regions.GetStoreRegionCount(uint64(i)) + simCase.Checker = func(_ []*metapb.Store, regions *core.RegionsInfo, stats []info.StoreStats) bool { + for storeID := range allStores { + peerCount := regions.GetStoreRegionCount(storeID) if peerCount < 5 { return false } diff --git a/tools/pd-simulator/simulator/client.go b/tools/pd-simulator/simulator/client.go index 50ed57995df..21e37aed83c 100644 --- a/tools/pd-simulator/simulator/client.go +++ b/tools/pd-simulator/simulator/client.go @@ -15,11 +15,8 @@ package simulator import ( - "bytes" "context" - "encoding/json" - "fmt" - "net/http" + pdHttp "github.com/tikv/pd/client/http" "strings" "sync" "time" @@ -28,7 +25,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/typeutil" sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" @@ -44,6 +40,7 @@ type Client interface { AllocID(ctx context.Context) (uint64, error) Bootstrap(ctx context.Context, store *metapb.Store, region *metapb.Region) error PutStore(ctx context.Context, store *metapb.Store) error + DeleteStore(ctx context.Context, storeID uint64) error StoreHeartbeat(ctx context.Context, stats *pdpb.StoreStats) error RegionHeartbeat(ctx context.Context, region *core.RegionInfo) error PutPDConfig(*sc.PDConfig) error @@ -60,6 +57,8 @@ const ( var ( // errFailInitClusterID is returned when failed to load clusterID from all supplied PD addresses. errFailInitClusterID = errors.New("[pd] failed to get cluster id") + + PDHTTPClient pdHttp.Client ) type client struct { @@ -67,7 +66,6 @@ type client struct { tag string clusterID uint64 clientConn *grpc.ClientConn - httpClient *http.Client reportRegionHeartbeatCh chan *core.RegionInfo receiveRegionHeartbeatCh chan *pdpb.RegionHeartbeatResponse @@ -88,7 +86,6 @@ func NewClient(pdAddr string, tag string) (Client, <-chan *pdpb.RegionHeartbeatR ctx: ctx, cancel: cancel, tag: tag, - httpClient: &http.Client{}, } cc, err := c.createConn() if err != nil { @@ -317,48 +314,39 @@ func (c *client) PutStore(ctx context.Context, store *metapb.Store) error { return nil } +func (c *client) DeleteStore(ctx context.Context, storeID uint64) error { + ctx, cancel := context.WithTimeout(ctx, pdTimeout) + err := PDHTTPClient.DeleteStore(ctx, storeID) + cancel() + if err != nil { + return err + } + return nil +} + func (c *client) PutPDConfig(config *sc.PDConfig) error { if len(config.PlacementRules) > 0 { - path := fmt.Sprintf("%s/%s/config/rules/batch", c.url, httpPrefix) - ruleOps := make([]*placement.RuleOp, 0) + ruleOps := make([]*pdHttp.RuleOp, 0) for _, rule := range config.PlacementRules { - ruleOps = append(ruleOps, &placement.RuleOp{ + ruleOps = append(ruleOps, &pdHttp.RuleOp{ Rule: rule, - Action: placement.RuleOpAdd, + Action: pdHttp.RuleOpAdd, }) } - content, _ := json.Marshal(ruleOps) - req, err := http.NewRequest(http.MethodPost, path, bytes.NewBuffer(content)) - req.Header.Add("Content-Type", "application/json") + err := PDHTTPClient.SetPlacementRuleInBatch(c.ctx, ruleOps) if err != nil { return err } - res, err := c.httpClient.Do(req) - if err != nil { - return err - } - defer res.Body.Close() - simutil.Logger.Info("add placement rule success", zap.String("rules", string(content))) + simutil.Logger.Info("add placement rule success", zap.Any("rules", config.PlacementRules)) } if len(config.LocationLabels) > 0 { - path := fmt.Sprintf("%s/%s/config", c.url, httpPrefix) data := make(map[string]any) data["location-labels"] = config.LocationLabels - content, err := json.Marshal(data) - if err != nil { - return err - } - req, err := http.NewRequest(http.MethodPost, path, bytes.NewBuffer(content)) - req.Header.Add("Content-Type", "application/json") - if err != nil { - return err - } - res, err := c.httpClient.Do(req) + err := PDHTTPClient.SetConfig(c.ctx, data) if err != nil { return err } - defer res.Body.Close() - simutil.Logger.Info("add location labels success", zap.String("labels", string(content))) + simutil.Logger.Info("add location labels success", zap.Any("labels", config.LocationLabels)) } return nil } diff --git a/tools/pd-simulator/simulator/config/config.go b/tools/pd-simulator/simulator/config/config.go index 01bf8199ab4..91f36384999 100644 --- a/tools/pd-simulator/simulator/config/config.go +++ b/tools/pd-simulator/simulator/config/config.go @@ -16,13 +16,13 @@ package config import ( "fmt" + pdHttp "github.com/tikv/pd/client/http" "os" "time" "github.com/BurntSushi/toml" "github.com/docker/go-units" sc "github.com/tikv/pd/pkg/schedule/config" - "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/configutil" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/typeutil" @@ -133,6 +133,7 @@ func (sc *SimConfig) Speed() uint64 { // PDConfig saves some config which may be changed in PD. type PDConfig struct { - PlacementRules []*placement.Rule - LocationLabels typeutil.StringSlice + PlacementRules []*pdHttp.Rule + LocationLabels typeutil.StringSlice + MaxStoreDownTime typeutil.Duration } diff --git a/tools/pd-simulator/simulator/drive.go b/tools/pd-simulator/simulator/drive.go index 700dd58f87a..bae15a24b23 100644 --- a/tools/pd-simulator/simulator/drive.go +++ b/tools/pd-simulator/simulator/drive.go @@ -160,16 +160,18 @@ func (d *Driver) Tick() { // Check checks if the simulation is completed. func (d *Driver) Check() bool { length := uint64(len(d.conn.Nodes) + 1) - for index := range d.conn.Nodes { + var stores []*metapb.Store + for index, s := range d.conn.Nodes { if index >= length { length = index + 1 } + stores = append(stores, s.Store) } stats := make([]info.StoreStats, length) for index, node := range d.conn.Nodes { stats[index] = *node.stats } - return d.simCase.Checker(d.raftEngine.regionsInfo, stats) + return d.simCase.Checker(stores, d.raftEngine.regionsInfo, stats) } // Start starts all nodes. diff --git a/tools/pd-simulator/simulator/event.go b/tools/pd-simulator/simulator/event.go index 8be8f89d759..198357a9724 100644 --- a/tools/pd-simulator/simulator/event.go +++ b/tools/pd-simulator/simulator/event.go @@ -17,16 +17,16 @@ package simulator import ( "context" "fmt" - "math/rand" - "net/http" - "sync" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/tools/pd-simulator/simulator/cases" "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" "go.uber.org/zap" + "math/rand" + "net/http" + "strconv" + "sync" ) // Event affects the status of the cluster. @@ -74,7 +74,12 @@ func (e *eventHandler) createEvent(w http.ResponseWriter, r *http.Request) { e.er.addEvent(&AddNode{}) return case "down-node": - e.er.addEvent(&DownNode{}) + id := r.URL.Query().Get("node-id") + var ID int + if len(id) != 0 { + ID, _ = strconv.Atoi(id) + } + e.er.addEvent(&DownNode{ID: ID}) return default: } @@ -200,22 +205,39 @@ func (*AddNode) Run(raft *RaftEngine, _ int64) bool { } // DownNode deletes nodes. -type DownNode struct{} +type DownNode struct { + ID int +} // Run implements the event interface. -func (*DownNode) Run(raft *RaftEngine, _ int64) bool { - nodes := raft.conn.getNodes() +func (e *DownNode) Run(raft *RaftEngine, _ int64) bool { + nodes := raft.conn.Nodes if len(nodes) == 0 { simutil.Logger.Error("can not find any node") return false } - i := rand.Intn(len(nodes)) - node := nodes[i] + var node *Node + if e.ID == 0 { + arrNodes := raft.conn.getNodes() + i := rand.Intn(len(arrNodes)) + node = nodes[arrNodes[i].Store.GetId()] + } else { + node = nodes[uint64(e.ID)] + } if node == nil { simutil.Logger.Error("node is not existed", zap.Uint64("node-id", node.Id)) return false } delete(raft.conn.Nodes, node.Id) + // update store state to removed + node.Store.NodeState = metapb.NodeState_Removed + arrNodes := raft.conn.getNodes() + + err := arrNodes[0].client.DeleteStore(context.TODO(), node.Id) + if err != nil { + simutil.Logger.Error("put store failed", zap.Uint64("node-id", node.Id), zap.Error(err)) + return false + } node.Stop() regions := raft.GetRegions() diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index 883b5d4474b..c51cdfd8a38 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -72,6 +72,7 @@ func NewNode(s *cases.Store, pdAddr string, config *sc.SimConfig) (*Node, error) StoreId: s.ID, Capacity: uint64(config.RaftStore.Capacity), StartTime: uint32(time.Now().Unix()), + Available: uint64(config.RaftStore.Capacity), }, } tag := fmt.Sprintf("store %d", s.ID)