Skip to content

Commit

Permalink
scheduler: fix scheduler save config (#7108) (#7164)
Browse files Browse the repository at this point in the history
close #6897

Signed-off-by: husharp <[email protected]>

Co-authored-by: husharp <[email protected]>
  • Loading branch information
ti-chi-bot and HuSharp authored Oct 11, 2023
1 parent f4d1cb7 commit 5cf2c43
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 8 deletions.
4 changes: 4 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,10 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string)
c.wg.Add(1)
go c.runScheduler(s)
c.schedulers[s.GetName()] = s
if err := schedule.SaveSchedulerConfig(c.cluster.storage, scheduler); err != nil {
log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err))
return err
}
c.cluster.opt.AddSchedulerCfg(s.GetType(), args)
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,9 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) {
// whether the schedulers added or removed in dynamic way are recorded in opt
_, newOpt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
_, err = schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null")))
shuffle, err := schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null")))
c.Assert(err, IsNil)
c.Assert(co.addScheduler(shuffle), IsNil)
// suppose we add a new default enable scheduler
config.DefaultSchedulers = append(config.DefaultSchedulers, config.SchedulerConfig{Type: "shuffle-region"})
defer func() {
Expand Down
13 changes: 6 additions & 7 deletions server/schedule/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,16 @@ func CreateScheduler(typ string, opController *OperatorController, storage endpo
if !ok {
return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ)
}
return fn(opController, storage, dec)
}

s, err := fn(opController, storage, dec)
if err != nil {
return nil, err
}
// SaveSchedulerConfig saves the config of the specified scheduler.
func SaveSchedulerConfig(storage endpoint.ConfigStorage, s Scheduler) error {
data, err := s.EncodeConfig()
if err != nil {
return nil, err
return err
}
err = storage.SaveScheduleConfig(s.GetName(), data)
return s, err
return storage.SaveScheduleConfig(s.GetName(), data)
}

// FindSchedulerTypeByName finds the type of the specified name.
Expand Down
59 changes: 59 additions & 0 deletions tests/server/api/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/pingcap/check"
)

const schedulersPrefix = "/pd/api/v1/schedulers"

// dialClient used to dial http request.
var dialClient = &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
},
}

// MustAddScheduler adds a scheduler with HTTP API.
func MustAddScheduler(
c *check.C, serverAddr string,
schedulerName string, args map[string]interface{},
) {
request := map[string]interface{}{
"name": schedulerName,
}
for arg, val := range args {
request[arg] = val
}
data, err := json.Marshal(request)
c.Assert(err, check.IsNil)

httpReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s", serverAddr, schedulersPrefix), bytes.NewBuffer(data))
c.Assert(err, check.IsNil)
// Send request.
resp, err := dialClient.Do(httpReq)
c.Assert(err, check.IsNil)
defer resp.Body.Close()
_, err = io.ReadAll(resp.Body)
c.Assert(err, check.IsNil)
c.Assert(resp.StatusCode, check.Equals, http.StatusOK)
}
118 changes: 118 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ import (
"github.com/tikv/pd/server/id"
syncer "github.com/tikv/pd/server/region_syncer"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/storage"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/server/api"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -1420,3 +1422,119 @@ func (s *clusterTestSuite) TestTransferLeaderBack(c *C) {
c.Assert(rc.GetMetaCluster(), DeepEquals, meta)
c.Assert(rc.GetStoreCount(), Equals, 3)
}

func (s *clusterTestSuite) TestTransferLeaderForScheduler(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`), IsNil)
tc, err := tests.NewTestCluster(ctx, 2)
defer tc.Destroy()
c.Assert(err, IsNil)
err = tc.RunInitialServers()
c.Assert(err, IsNil)
tc.WaitLeader()
// start
leaderServer := tc.GetServer(tc.GetLeader())
c.Assert(leaderServer.BootstrapCluster(), IsNil)
rc := leaderServer.GetServer().GetRaftCluster()
c.Assert(rc, NotNil)

storesNum := 2
grpcPDClient := testutil.MustNewGrpcClient(c, leaderServer.GetAddr())
for i := 1; i <= storesNum; i++ {
store := &metapb.Store{
Id: uint64(i),
Address: "127.0.0.1:" + strconv.Itoa(i),
}
resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store)
c.Assert(err, IsNil)
c.Assert(resp.GetHeader().GetError().GetType(), Equals, pdpb.ErrorType_OK)
}
// region heartbeat
id := leaderServer.GetAllocator()
s.putRegionWithLeader(c, rc, id, 1)

time.Sleep(time.Second)
c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue)

// Add evict leader scheduler
api.MustAddScheduler(c, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{
"store_id": 1,
})
api.MustAddScheduler(c, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{
"store_id": 2,
})
// Check scheduler updated.
c.Assert(len(rc.GetSchedulers()), Equals, 5)
checkEvictLeaderSchedulerExist(c, rc, true)
checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2})

// transfer PD leader to another PD
tc.ResignLeader()
rc.Stop()
tc.WaitLeader()
leaderServer = tc.GetServer(tc.GetLeader())
rc1 := leaderServer.GetServer().GetRaftCluster()
rc1.Start(leaderServer.GetServer())
c.Assert(err, IsNil)
c.Assert(rc1, NotNil)
// region heartbeat
id = leaderServer.GetAllocator()
s.putRegionWithLeader(c, rc1, id, 1)
time.Sleep(time.Second)
c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue)
// Check scheduler updated.
c.Assert(len(rc.GetSchedulers()), Equals, 5)
checkEvictLeaderSchedulerExist(c, rc, true)
checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2})

// transfer PD leader back to the previous PD
tc.ResignLeader()
rc1.Stop()
tc.WaitLeader()
leaderServer = tc.GetServer(tc.GetLeader())
rc = leaderServer.GetServer().GetRaftCluster()
rc.Start(leaderServer.GetServer())
c.Assert(rc, NotNil)
// region heartbeat
id = leaderServer.GetAllocator()
s.putRegionWithLeader(c, rc, id, 1)
time.Sleep(time.Second)
c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue)
// Check scheduler updated
c.Assert(len(rc.GetSchedulers()), Equals, 5)
checkEvictLeaderSchedulerExist(c, rc, true)
checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2})

c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker"), IsNil)
}

func checkEvictLeaderSchedulerExist(c *C, rc *cluster.RaftCluster, exist bool) {
isExistScheduler := func(rc *cluster.RaftCluster, name string) bool {
s := rc.GetSchedulers()
for _, scheduler := range s {
if scheduler == name {
return true
}
}
return false
}

testutil.WaitUntil(c, func() bool {
return isExistScheduler(rc, schedulers.EvictLeaderName) == exist
})
}

func checkEvictLeaderStoreIDs(c *C, rc *cluster.RaftCluster, expected []uint64) {
handler, ok := rc.GetSchedulerHandlers()[schedulers.EvictLeaderName]
c.Assert(ok, IsTrue)
h, ok := handler.(interface {
EvictStoreIDs() []uint64
})
c.Assert(ok, IsTrue)
var evictStoreIDs []uint64
testutil.WaitUntil(c, func() bool {
evictStoreIDs = h.EvictStoreIDs()
return len(evictStoreIDs) == len(expected)
})
}

0 comments on commit 5cf2c43

Please sign in to comment.