diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index a6b0783d93d..d62de672896 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -641,6 +641,10 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) c.wg.Add(1) go c.runScheduler(s) c.schedulers[s.GetName()] = s + if err := schedule.SaveSchedulerConfig(c.cluster.storage, scheduler); err != nil { + log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } c.cluster.opt.AddSchedulerCfg(s.GetType(), args) return nil } diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 20ab1f4f8fa..c15302015f7 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -749,8 +749,9 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { // whether the schedulers added or removed in dynamic way are recorded in opt _, newOpt, err := newTestScheduleConfig() c.Assert(err, IsNil) - _, err = schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null"))) + shuffle, err := schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null"))) c.Assert(err, IsNil) + c.Assert(co.addScheduler(shuffle), IsNil) // suppose we add a new default enable scheduler config.DefaultSchedulers = append(config.DefaultSchedulers, config.SchedulerConfig{Type: "shuffle-region"}) defer func() { diff --git a/server/schedule/scheduler.go b/server/schedule/scheduler.go index d440326967c..7a5d3c02ad6 100644 --- a/server/schedule/scheduler.go +++ b/server/schedule/scheduler.go @@ -117,17 +117,16 @@ func CreateScheduler(typ string, opController *OperatorController, storage endpo if !ok { return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } + return fn(opController, storage, dec) +} - s, err := fn(opController, storage, dec) - if err != nil { - return nil, err - } +// SaveSchedulerConfig saves the config of the specified scheduler. +func SaveSchedulerConfig(storage endpoint.ConfigStorage, s Scheduler) error { data, err := s.EncodeConfig() if err != nil { - return nil, err + return err } - err = storage.SaveScheduleConfig(s.GetName(), data) - return s, err + return storage.SaveScheduleConfig(s.GetName(), data) } // FindSchedulerTypeByName finds the type of the specified name. diff --git a/tests/server/api/testutil.go b/tests/server/api/testutil.go new file mode 100644 index 00000000000..37f4be0bc16 --- /dev/null +++ b/tests/server/api/testutil.go @@ -0,0 +1,59 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/pingcap/check" +) + +const schedulersPrefix = "/pd/api/v1/schedulers" + +// dialClient used to dial http request. +var dialClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, +} + +// MustAddScheduler adds a scheduler with HTTP API. +func MustAddScheduler( + c *check.C, serverAddr string, + schedulerName string, args map[string]interface{}, +) { + request := map[string]interface{}{ + "name": schedulerName, + } + for arg, val := range args { + request[arg] = val + } + data, err := json.Marshal(request) + c.Assert(err, check.IsNil) + + httpReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s", serverAddr, schedulersPrefix), bytes.NewBuffer(data)) + c.Assert(err, check.IsNil) + // Send request. + resp, err := dialClient.Do(httpReq) + c.Assert(err, check.IsNil) + defer resp.Body.Close() + _, err = io.ReadAll(resp.Body) + c.Assert(err, check.IsNil) + c.Assert(resp.StatusCode, check.Equals, http.StatusOK) +} diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index bbb6da37391..7acff3f2194 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -41,8 +41,10 @@ import ( "github.com/tikv/pd/server/id" syncer "github.com/tikv/pd/server/region_syncer" "github.com/tikv/pd/server/schedule/operator" + "github.com/tikv/pd/server/schedulers" "github.com/tikv/pd/server/storage" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/server/api" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -1420,3 +1422,119 @@ func (s *clusterTestSuite) TestTransferLeaderBack(c *C) { c.Assert(rc.GetMetaCluster(), DeepEquals, meta) c.Assert(rc.GetStoreCount(), Equals, 3) } + +func (s *clusterTestSuite) TestTransferLeaderForScheduler(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`), IsNil) + tc, err := tests.NewTestCluster(ctx, 2) + defer tc.Destroy() + c.Assert(err, IsNil) + err = tc.RunInitialServers() + c.Assert(err, IsNil) + tc.WaitLeader() + // start + leaderServer := tc.GetServer(tc.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + rc := leaderServer.GetServer().GetRaftCluster() + c.Assert(rc, NotNil) + + storesNum := 2 + grpcPDClient := testutil.MustNewGrpcClient(c, leaderServer.GetAddr()) + for i := 1; i <= storesNum; i++ { + store := &metapb.Store{ + Id: uint64(i), + Address: "127.0.0.1:" + strconv.Itoa(i), + } + resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store) + c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError().GetType(), Equals, pdpb.ErrorType_OK) + } + // region heartbeat + id := leaderServer.GetAllocator() + s.putRegionWithLeader(c, rc, id, 1) + + time.Sleep(time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue) + + // Add evict leader scheduler + api.MustAddScheduler(c, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + api.MustAddScheduler(c, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 2, + }) + // Check scheduler updated. + c.Assert(len(rc.GetSchedulers()), Equals, 5) + checkEvictLeaderSchedulerExist(c, rc, true) + checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2}) + + // transfer PD leader to another PD + tc.ResignLeader() + rc.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc1 := leaderServer.GetServer().GetRaftCluster() + rc1.Start(leaderServer.GetServer()) + c.Assert(err, IsNil) + c.Assert(rc1, NotNil) + // region heartbeat + id = leaderServer.GetAllocator() + s.putRegionWithLeader(c, rc1, id, 1) + time.Sleep(time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue) + // Check scheduler updated. + c.Assert(len(rc.GetSchedulers()), Equals, 5) + checkEvictLeaderSchedulerExist(c, rc, true) + checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2}) + + // transfer PD leader back to the previous PD + tc.ResignLeader() + rc1.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + rc.Start(leaderServer.GetServer()) + c.Assert(rc, NotNil) + // region heartbeat + id = leaderServer.GetAllocator() + s.putRegionWithLeader(c, rc, id, 1) + time.Sleep(time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue) + // Check scheduler updated + c.Assert(len(rc.GetSchedulers()), Equals, 5) + checkEvictLeaderSchedulerExist(c, rc, true) + checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2}) + + c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker"), IsNil) +} + +func checkEvictLeaderSchedulerExist(c *C, rc *cluster.RaftCluster, exist bool) { + isExistScheduler := func(rc *cluster.RaftCluster, name string) bool { + s := rc.GetSchedulers() + for _, scheduler := range s { + if scheduler == name { + return true + } + } + return false + } + + testutil.WaitUntil(c, func() bool { + return isExistScheduler(rc, schedulers.EvictLeaderName) == exist + }) +} + +func checkEvictLeaderStoreIDs(c *C, rc *cluster.RaftCluster, expected []uint64) { + handler, ok := rc.GetSchedulerHandlers()[schedulers.EvictLeaderName] + c.Assert(ok, IsTrue) + h, ok := handler.(interface { + EvictStoreIDs() []uint64 + }) + c.Assert(ok, IsTrue) + var evictStoreIDs []uint64 + testutil.WaitUntil(c, func() bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == len(expected) + }) +}