diff --git a/client/errs/errno.go b/client/errs/errno.go index c3f5c27275a..50c136dd5f2 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -91,7 +91,7 @@ var ( var ( ErrClientListResourceGroup = errors.Normalize("get all resource group failed, %v", errors.RFCCodeText("PD:client:ErrClientListResourceGroup")) ErrClientResourceGroupConfigUnavailable = errors.Normalize("resource group config is unavailable, %v", errors.RFCCodeText("PD:client:ErrClientResourceGroupConfigUnavailable")) - ErrClientResourceGroupThrottled = errors.Normalize("exceeded resource group quota limitation", errors.RFCCodeText("PD:client:ErrClientResourceGroupThrottled")) + ErrClientResourceGroupThrottled = errors.Normalize("exceeded resource group quota limitation, estimated wait time %s, ltb state is %.2f:%.2f", errors.RFCCodeText("PD:client:ErrClientResourceGroupThrottled")) ) // ErrClientGetResourceGroup is the error type for getting resource group. diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 63c94a9782b..7e76934643f 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -122,13 +122,14 @@ func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensN // A Reservation holds information about events that are permitted by a Limiter to happen after a delay. // A Reservation may be canceled, which may enable the Limiter to permit additional events. type Reservation struct { - ok bool - lim *Limiter - tokens float64 - timeToAct time.Time - needWaitDurtion time.Duration + ok bool + lim *Limiter + tokens float64 + timeToAct time.Time + needWaitDuration time.Duration // This is the Limit at reservation time, it can change later. - limit Limit + limit Limit + remainingTokens float64 } // OK returns whether the limiter can provide the requested number of tokens @@ -359,10 +360,11 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur // Prepare reservation r := Reservation{ - ok: ok, - lim: lim, - limit: lim.limit, - needWaitDurtion: waitDuration, + ok: ok, + lim: lim, + limit: lim.limit, + needWaitDuration: waitDuration, + remainingTokens: tokens, } if ok { r.tokens = n @@ -380,6 +382,8 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur zap.Float64("current-ltb-tokens", lim.tokens), zap.Float64("current-ltb-rate", float64(lim.limit)), zap.Float64("request-tokens", n), + zap.Float64("notify-threshold", lim.notifyThreshold), + zap.Bool("is-low-process", lim.isLowProcess), zap.Int64("burst", lim.burst), zap.Int("remaining-notify-times", lim.remainingNotifyTimes)) lim.last = last @@ -461,7 +465,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv for _, res := range reservations { if !res.ok { cancel() - return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled + return res.needWaitDuration, errs.ErrClientResourceGroupThrottled.FastGenByArgs(res.needWaitDuration, res.limit, res.remainingTokens) } delay := res.DelayFrom(now) if delay > longestDelayDuration { diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index 786e5c51cdf..c9bed856f1c 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -163,6 +163,7 @@ func TestCancel(t *testing.T) { d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2}) re.Equal(4*time.Second, d) re.Error(err) + re.Contains(err.Error(), "estimated wait time 4s, ltb state is 1.00:-4.00") checkTokens(re, lim1, t3, 13) checkTokens(re, lim2, t3, 3) cancel1() diff --git a/go.mod b/go.mod index 7c15f7024a1..2620f5ad0a7 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 + github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 diff --git a/go.sum b/go.sum index 72d16078de4..d99804c887c 100644 --- a/go.sum +++ b/go.sum @@ -389,8 +389,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 h1:7tDr4J6gGQ3OqBq+lZQkI9wlJIIXFitHjNK8ymU/SEo= -github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1 h1:vDWWJKU6ztczn24XixahtLwcnJ15DOtSRIRM3jVtZNU= +github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/pkg/unsaferecovery/unsafe_recovery_controller.go b/pkg/unsaferecovery/unsafe_recovery_controller.go index aa45ba6a2bd..044dbd182e2 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller.go @@ -34,6 +34,7 @@ import ( sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" ) @@ -780,6 +781,12 @@ func (r *regionItem) IsRaftStale(origin *regionItem, u *Controller) bool { func(a, b *regionItem) int { return int(a.report.GetRaftState().GetHardState().GetTerm()) - int(b.report.GetRaftState().GetHardState().GetTerm()) }, + // choose the peer has maximum applied index or last index. + func(a, b *regionItem) int { + maxIdxA := typeutil.MaxUint64(a.report.GetRaftState().GetLastIndex(), a.report.AppliedIndex) + maxIdxB := typeutil.MaxUint64(b.report.GetRaftState().GetLastIndex(), b.report.AppliedIndex) + return int(maxIdxA - maxIdxB) + }, func(a, b *regionItem) int { return int(a.report.GetRaftState().GetLastIndex()) - int(b.report.GetRaftState().GetLastIndex()) }, diff --git a/pkg/unsaferecovery/unsafe_recovery_controller_test.go b/pkg/unsaferecovery/unsafe_recovery_controller_test.go index 6f1fab62164..cce38285212 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller_test.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller_test.go @@ -1856,3 +1856,105 @@ func newTestStores(n uint64, version string) []*core.StoreInfo { func getTestDeployPath(storeID uint64) string { return fmt.Sprintf("test/store%d", storeID) } + +func TestSelectLeader(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opts := mockconfig.NewTestOptions() + cluster := mockcluster.NewCluster(ctx, opts) + coordinator := schedule.NewCoordinator(ctx, cluster, hbstream.NewTestHeartbeatStreams(ctx, cluster.ID, cluster, true)) + coordinator.Run() + stores := newTestStores(6, "6.0.0") + labels := []*metapb.StoreLabel{ + { + Key: core.EngineKey, + Value: core.EngineTiFlash, + }, + } + stores[5].IsTiFlash() + core.SetStoreLabels(labels)(stores[5]) + for _, store := range stores { + cluster.PutStore(store) + } + recoveryController := NewController(cluster) + + cases := []struct { + peers []*regionItem + leaderID uint64 + }{ + { + peers: []*regionItem{ + newPeer(1, 1, 10, 5, 4), + newPeer(2, 2, 9, 9, 8), + }, + leaderID: 2, + }, + { + peers: []*regionItem{ + newPeer(1, 1, 10, 10, 9), + newPeer(2, 1, 8, 8, 15), + newPeer(3, 1, 12, 11, 11), + }, + leaderID: 2, + }, + { + peers: []*regionItem{ + newPeer(1, 1, 9, 9, 11), + newPeer(2, 1, 10, 8, 7), + newPeer(3, 1, 11, 7, 6), + }, + leaderID: 3, + }, + { + peers: []*regionItem{ + newPeer(1, 1, 11, 11, 8), + newPeer(2, 1, 11, 10, 10), + newPeer(3, 1, 11, 9, 8), + }, + leaderID: 1, + }, + { + peers: []*regionItem{ + newPeer(6, 1, 11, 11, 9), + newPeer(1, 1, 11, 11, 8), + newPeer(2, 1, 11, 10, 10), + newPeer(3, 1, 11, 9, 8), + }, + leaderID: 1, + }, + } + + for i, c := range cases { + peersMap := map[uint64][]*regionItem{ + 1: c.peers, + } + region := &metapb.Region{ + Id: 1, + } + leader := recoveryController.selectLeader(peersMap, region) + re.Equal(leader.Region().Id, c.leaderID, "case: %d", i) + } +} + +func newPeer(storeID, term, lastIndex, committedIndex, appliedIndex uint64) *regionItem { + return ®ionItem{ + storeID: storeID, + report: &pdpb.PeerReport{ + RaftState: &raft_serverpb.RaftLocalState{ + HardState: &eraftpb.HardState{ + Term: term, + Commit: committedIndex, + }, + LastIndex: lastIndex, + }, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: storeID, + }, + }, + AppliedIndex: appliedIndex, + }, + } +} diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index c786db107e2..cdd244cafc1 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -14,7 +14,7 @@ require ( github.com/go-sql-driver/mysql v1.7.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 + github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.19.0 github.com/prometheus/client_model v0.6.0 diff --git a/tests/integrations/go.sum b/tests/integrations/go.sum index f795950bc5f..79f7dddd130 100644 --- a/tests/integrations/go.sum +++ b/tests/integrations/go.sum @@ -368,8 +368,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 h1:7tDr4J6gGQ3OqBq+lZQkI9wlJIIXFitHjNK8ymU/SEo= -github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1 h1:vDWWJKU6ztczn24XixahtLwcnJ15DOtSRIRM3jVtZNU= +github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 38294598bc5..be51123532d 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -650,6 +650,8 @@ func (suite *apiTestSuite) checkStores(cluster *tests.TestCluster) { for _, store := range stores { tests.MustPutStore(re, cluster, store) } + // prevent the offline store from changing to tombstone + tests.MustPutRegion(re, cluster, 3, 6, []byte("a"), []byte("b")) // Test /stores apiServerAddr := cluster.GetLeaderServer().GetAddr() urlPrefix := fmt.Sprintf("%s/pd/api/v1/stores", apiServerAddr) diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 912ff83e8d5..83ab0f1cebb 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -780,11 +780,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { }, }, } - var bundles []placement.GroupBundle - err := tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) - re.NoError(err) - re.Len(bundles, 1) - suite.assertBundleEqual(re, bundles[0], b1) + suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b1}, 1) // Set b2 := placement.GroupBundle{ @@ -801,27 +797,17 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { re.NoError(err) // Get - var bundle placement.GroupBundle - err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule/foo", &bundle) - re.NoError(err) - suite.assertBundleEqual(re, bundle, b2) + suite.assertBundleEqual(re, urlPrefix+"/placement-rule/foo", b2) // GetAll again - err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) - re.NoError(err) - re.Len(bundles, 2) - suite.assertBundleEqual(re, bundles[0], b1) - suite.assertBundleEqual(re, bundles[1], b2) + suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b1, b2}, 2) // Delete err = tu.CheckDelete(testDialClient, urlPrefix+"/placement-rule/pd", tu.StatusOK(re)) re.NoError(err) // GetAll again - err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) - re.NoError(err) - re.Len(bundles, 1) - suite.assertBundleEqual(re, bundles[0], b2) + suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b2}, 1) // SetAll b2.Rules = append(b2.Rules, &placement.Rule{GroupID: "foo", ID: "baz", Index: 2, Role: placement.Follower, Count: 1}) @@ -833,22 +819,14 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { re.NoError(err) // GetAll again - err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) - re.NoError(err) - re.Len(bundles, 3) - suite.assertBundleEqual(re, bundles[0], b2) - suite.assertBundleEqual(re, bundles[1], b1) - suite.assertBundleEqual(re, bundles[2], b3) + suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b1, b2, b3}, 3) // Delete using regexp err = tu.CheckDelete(testDialClient, urlPrefix+"/placement-rule/"+url.PathEscape("foo.*")+"?regexp", tu.StatusOK(re)) re.NoError(err) // GetAll again - err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) - re.NoError(err) - re.Len(bundles, 1) - suite.assertBundleEqual(re, bundles[0], b1) + suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b1}, 1) // Set id := "rule-without-group-id" @@ -865,18 +843,11 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { b4.ID = id b4.Rules[0].GroupID = b4.ID - // Get - err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule/"+id, &bundle) - re.NoError(err) - suite.assertBundleEqual(re, bundle, b4) + suite.assertBundleEqual(re, urlPrefix+"/placement-rule/"+id, b4) // GetAll again - err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) - re.NoError(err) - re.Len(bundles, 2) - suite.assertBundleEqual(re, bundles[0], b1) - suite.assertBundleEqual(re, bundles[1], b4) + suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b1, b4}, 2) // SetAll b5 := placement.GroupBundle{ @@ -894,12 +865,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { b5.Rules[0].GroupID = b5.ID // GetAll again - err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) - re.NoError(err) - re.Len(bundles, 3) - suite.assertBundleEqual(re, bundles[0], b1) - suite.assertBundleEqual(re, bundles[1], b4) - suite.assertBundleEqual(re, bundles[2], b5) + suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b1, b4, b5}, 3) } func (suite *ruleTestSuite) TestBundleBadRequest() { @@ -1228,9 +1194,35 @@ func (suite *ruleTestSuite) checkLargeRules(cluster *tests.TestCluster) { suite.postAndCheckRuleBundle(urlPrefix, genBundlesWithRulesNum(etcdutil.MaxEtcdTxnOps*2)) } -func (suite *ruleTestSuite) assertBundleEqual(re *require.Assertions, b1, b2 placement.GroupBundle) { +func (suite *ruleTestSuite) assertBundleEqual(re *require.Assertions, url string, expectedBundle placement.GroupBundle) { + var bundle placement.GroupBundle + tu.Eventually(re, func() bool { + err := tu.ReadGetJSON(re, testDialClient, url, &bundle) + if err != nil { + return false + } + return suite.compareBundle(bundle, expectedBundle) + }) +} + +func (suite *ruleTestSuite) assertBundlesEqual(re *require.Assertions, url string, expectedBundles []placement.GroupBundle, expectedLen int) { + var bundles []placement.GroupBundle tu.Eventually(re, func() bool { - return suite.compareBundle(b1, b2) + err := tu.ReadGetJSON(re, testDialClient, url, &bundles) + if err != nil { + return false + } + if len(bundles) != expectedLen { + return false + } + sort.Slice(bundles, func(i, j int) bool { return bundles[i].ID < bundles[j].ID }) + sort.Slice(expectedBundles, func(i, j int) bool { return expectedBundles[i].ID < expectedBundles[j].ID }) + for i := range bundles { + if !suite.compareBundle(bundles[i], expectedBundles[i]) { + return false + } + } + return true }) } diff --git a/tests/server/api/scheduler_test.go b/tests/server/api/scheduler_test.go index 3b98be4bbb5..2329077209d 100644 --- a/tests/server/api/scheduler_test.go +++ b/tests/server/api/scheduler_test.go @@ -161,18 +161,20 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { name: "balance-leader-scheduler", createdName: "balance-leader-scheduler", extraTestFunc: func(name string) { - resp := make(map[string]any) listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - re.Equal(4.0, resp["batch"]) + resp := make(map[string]any) + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return resp["batch"] == 4.0 + }) dataMap := make(map[string]any) dataMap["batch"] = 3 updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) body, err := json.Marshal(dataMap) re.NoError(err) re.NoError(tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re))) + resp = make(map[string]any) tu.Eventually(re, func() bool { // wait for scheduling server to be synced. - resp = make(map[string]any) re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) return resp["batch"] == 3.0 }) @@ -192,8 +194,10 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { tu.StringEqual(re, "\"invalid batch size which should be an integer between 1 and 10\"\n")) re.NoError(err) resp = make(map[string]any) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - re.Equal(3.0, resp["batch"]) + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return resp["batch"] == 3.0 + }) // empty body err = tu.CheckPostJSON(testDialClient, updateURL, nil, tu.Status(re, http.StatusInternalServerError), @@ -216,7 +220,6 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { extraTestFunc: func(name string) { resp := make(map[string]any) listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) expectMap := map[string]any{ "min-hot-byte-rate": 100.0, "min-hot-key-rate": 10.0, @@ -241,10 +244,16 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { "history-sample-duration": "5m0s", "history-sample-interval": "30s", } - re.Equal(len(expectMap), len(resp), "expect %v, got %v", expectMap, resp) - for key := range expectMap { - re.Equal(expectMap[key], resp[key]) - } + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + re.Equal(len(expectMap), len(resp), "expect %v, got %v", expectMap, resp) + for key := range expectMap { + if !reflect.DeepEqual(resp[key], expectMap[key]) { + return false + } + } + return true + }) dataMap := make(map[string]any) dataMap["max-zombie-rounds"] = 5.0 expectMap["max-zombie-rounds"] = 5.0 @@ -253,11 +262,15 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { re.NoError(err) re.NoError(tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re))) resp = make(map[string]any) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - - for key := range expectMap { - re.Equal(expectMap[key], resp[key], "key %s", key) - } + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + for key := range expectMap { + if !reflect.DeepEqual(resp[key], expectMap[key]) { + return false + } + } + return true + }) // update again err = tu.CheckPostJSON(testDialClient, updateURL, body, @@ -279,11 +292,12 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { name: "split-bucket-scheduler", createdName: "split-bucket-scheduler", extraTestFunc: func(name string) { - resp := make(map[string]any) listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - re.Equal(3.0, resp["degree"]) - re.Equal(0.0, resp["split-limit"]) + resp := make(map[string]any) + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return resp["degree"] == 3.0 && resp["split-limit"] == 0.0 + }) dataMap := make(map[string]any) dataMap["degree"] = 4 updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) @@ -291,8 +305,10 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { re.NoError(err) re.NoError(tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re))) resp = make(map[string]any) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - re.Equal(4.0, resp["degree"]) + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return resp["degree"] == 4.0 + }) // update again err = tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re), @@ -336,8 +352,10 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { extraTestFunc: func(name string) { resp := make(map[string]any) listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - re.Equal(4.0, resp["batch"]) + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return resp["batch"] == 4.0 + }) dataMap := make(map[string]any) dataMap["batch"] = 3 updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) @@ -345,8 +363,10 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { re.NoError(err) re.NoError(tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re))) resp = make(map[string]any) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - re.Equal(3.0, resp["batch"]) + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return resp["batch"] == 3.0 + }) // update again err = tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re), @@ -362,8 +382,10 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { tu.StringEqual(re, "\"invalid batch size which should be an integer between 1 and 10\"\n")) re.NoError(err) resp = make(map[string]any) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - re.Equal(3.0, resp["batch"]) + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return resp["batch"] == 3.0 + }) // empty body err = tu.CheckPostJSON(testDialClient, updateURL, nil, tu.Status(re, http.StatusInternalServerError), @@ -387,10 +409,12 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { extraTestFunc: func(name string) { resp := make(map[string]any) listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - exceptMap := make(map[string]any) - exceptMap["1"] = []any{map[string]any{"end-key": "", "start-key": ""}} - re.Equal(exceptMap, resp["store-id-ranges"]) + expectedMap := make(map[string]any) + expectedMap["1"] = []any{map[string]any{"end-key": "", "start-key": ""}} + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return reflect.DeepEqual(expectedMap, resp["store-id-ranges"]) + }) // using /pd/v1/schedule-config/grant-leader-scheduler/config to add new store to grant-leader-scheduler input := make(map[string]any) @@ -400,19 +424,23 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { body, err := json.Marshal(input) re.NoError(err) re.NoError(tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re))) + expectedMap["2"] = []any{map[string]any{"end-key": "", "start-key": ""}} resp = make(map[string]any) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - exceptMap["2"] = []any{map[string]any{"end-key": "", "start-key": ""}} - re.Equal(exceptMap, resp["store-id-ranges"]) + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return reflect.DeepEqual(expectedMap, resp["store-id-ranges"]) + }) // using /pd/v1/schedule-config/grant-leader-scheduler/config to delete exists store from grant-leader-scheduler deleteURL := fmt.Sprintf("%s%s%s/%s/delete/%s", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name, "2") err = tu.CheckDelete(testDialClient, deleteURL, tu.StatusOK(re)) re.NoError(err) + delete(expectedMap, "2") resp = make(map[string]any) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - delete(exceptMap, "2") - re.Equal(exceptMap, resp["store-id-ranges"]) + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return reflect.DeepEqual(expectedMap, resp["store-id-ranges"]) + }) err = tu.CheckDelete(testDialClient, deleteURL, tu.Status(re, http.StatusNotFound)) re.NoError(err) }, @@ -425,10 +453,10 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { extraTestFunc: func(name string) { resp := make(map[string]any) listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - re.Equal("", resp["start-key"]) - re.Equal("", resp["end-key"]) - re.Equal("test", resp["range-name"]) + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return resp["start-key"] == "" && resp["end-key"] == "" && resp["range-name"] == "test" + }) resp["start-key"] = "a_00" resp["end-key"] = "a_99" updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) @@ -436,10 +464,10 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { re.NoError(err) re.NoError(tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re))) resp = make(map[string]any) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - re.Equal("a_00", resp["start-key"]) - re.Equal("a_99", resp["end-key"]) - re.Equal("test", resp["range-name"]) + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return resp["start-key"] == "a_00" && resp["end-key"] == "a_99" && resp["range-name"] == "test" + }) }, }, { @@ -450,10 +478,12 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { extraTestFunc: func(name string) { resp := make(map[string]any) listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) - re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - exceptMap := make(map[string]any) - exceptMap["3"] = []any{map[string]any{"end-key": "", "start-key": ""}} - re.Equal(exceptMap, resp["store-id-ranges"]) + expectedMap := make(map[string]any) + expectedMap["3"] = []any{map[string]any{"end-key": "", "start-key": ""}} + tu.Eventually(re, func() bool { + re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return reflect.DeepEqual(expectedMap, resp["store-id-ranges"]) + }) // using /pd/v1/schedule-config/evict-leader-scheduler/config to add new store to evict-leader-scheduler input := make(map[string]any) @@ -463,22 +493,22 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { body, err := json.Marshal(input) re.NoError(err) re.NoError(tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re))) + expectedMap["4"] = []any{map[string]any{"end-key": "", "start-key": ""}} resp = make(map[string]any) tu.Eventually(re, func() bool { re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - exceptMap["4"] = []any{map[string]any{"end-key": "", "start-key": ""}} - return reflect.DeepEqual(exceptMap, resp["store-id-ranges"]) + return reflect.DeepEqual(expectedMap, resp["store-id-ranges"]) }) // using /pd/v1/schedule-config/evict-leader-scheduler/config to delete exist store from evict-leader-scheduler deleteURL := fmt.Sprintf("%s%s%s/%s/delete/%s", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name, "4") err = tu.CheckDelete(testDialClient, deleteURL, tu.StatusOK(re)) re.NoError(err) + delete(expectedMap, "4") resp = make(map[string]any) tu.Eventually(re, func() bool { re.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - delete(exceptMap, "4") - return reflect.DeepEqual(exceptMap, resp["store-id-ranges"]) + return reflect.DeepEqual(expectedMap, resp["store-id-ranges"]) }) err = tu.CheckDelete(testDialClient, deleteURL, tu.Status(re, http.StatusNotFound)) re.NoError(err) diff --git a/tests/server/config/config_test.go b/tests/server/config/config_test.go index 1bc035cceca..108bc5fc753 100644 --- a/tests/server/config/config_test.go +++ b/tests/server/config/config_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + cfg "github.com/tikv/pd/pkg/mcs/scheduling/server/config" "github.com/tikv/pd/pkg/ratelimit" sc "github.com/tikv/pd/pkg/schedule/config" tu "github.com/tikv/pd/pkg/utils/testutil" @@ -100,6 +101,7 @@ func (suite *configTestSuite) SetupSuite() { func (suite *configTestSuite) TearDownSuite() { suite.env.Cleanup() } + func (suite *configTestSuite) TestConfigAll() { suite.env.RunTestInTwoModes(suite.checkConfigAll) } @@ -139,16 +141,17 @@ func (suite *configTestSuite) checkConfigAll(cluster *tests.TestCluster) { re.NoError(err) err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) re.NoError(err) - - newCfg := &config.Config{} - err = tu.ReadGetJSON(re, testDialClient, addr, newCfg) - re.NoError(err) cfg.Replication.MaxReplicas = 5 cfg.Replication.LocationLabels = []string{"zone", "rack"} cfg.Schedule.RegionScheduleLimit = 10 cfg.PDServerCfg.MetricStorage = "http://127.0.0.1:9090" - re.Equal(newCfg, cfg) + tu.Eventually(re, func() bool { + newCfg := &config.Config{} + err = tu.ReadGetJSON(re, testDialClient, addr, newCfg) + re.NoError(err) + return suite.Equal(newCfg, cfg) + }) // the new way l = map[string]any{ "schedule.tolerant-size-ratio": 2.5, @@ -164,9 +167,6 @@ func (suite *configTestSuite) checkConfigAll(cluster *tests.TestCluster) { re.NoError(err) err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) re.NoError(err) - newCfg1 := &config.Config{} - err = tu.ReadGetJSON(re, testDialClient, addr, newCfg1) - re.NoError(err) cfg.Schedule.EnableTiKVSplitRegion = false cfg.Schedule.TolerantSizeRatio = 2.5 cfg.Replication.LocationLabels = []string{"idc", "host"} @@ -177,7 +177,12 @@ func (suite *configTestSuite) checkConfigAll(cluster *tests.TestCluster) { v, err := versioninfo.ParseVersion("v4.0.0-beta") re.NoError(err) cfg.ClusterVersion = *v - re.Equal(cfg, newCfg1) + tu.Eventually(re, func() bool { + newCfg1 := &config.Config{} + err = tu.ReadGetJSON(re, testDialClient, addr, newCfg1) + re.NoError(err) + return suite.Equal(cfg, newCfg1) + }) // revert this to avoid it affects TestConfigTTL l["schedule.enable-tikv-split-region"] = "true" @@ -470,9 +475,10 @@ func (suite *configTestSuite) assertTTLConfig( } checkFunc(cluster.GetLeaderServer().GetServer().GetPersistOptions()) if cluster.GetSchedulingPrimaryServer() != nil { - // wait for the scheduling primary server to be synced - options := cluster.GetSchedulingPrimaryServer().GetPersistConfig() + var options *cfg.PersistConfig tu.Eventually(re, func() bool { + // wait for the scheduling primary server to be synced + options = cluster.GetSchedulingPrimaryServer().GetPersistConfig() if expectedEqual { return uint64(999) == options.GetMaxSnapshotCount() } diff --git a/tools/go.mod b/tools/go.mod index b0de9a8a9da..8287f834471 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -21,7 +21,7 @@ require ( github.com/mattn/go-shellwords v1.0.12 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 + github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.0 diff --git a/tools/go.sum b/tools/go.sum index bea8714ee39..ac6cc75903e 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -365,8 +365,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 h1:7tDr4J6gGQ3OqBq+lZQkI9wlJIIXFitHjNK8ymU/SEo= -github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1 h1:vDWWJKU6ztczn24XixahtLwcnJ15DOtSRIRM3jVtZNU= +github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=