Skip to content

Commit

Permalink
schedulers, handler: fix the evict-leader-scheduler sync (tikv#7105)
Browse files Browse the repository at this point in the history
ref tikv#5839

- Pass the correct `removeSchedulerCb` when the server is in API mode.
- Pause and resume the store leader transfer when reloading the cfg.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Dec 1, 2023
1 parent 131be48 commit f327342
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 30 deletions.
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (c *Cluster) updateScheduler() {
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-notifier:
// This is triggered by the watcher when the schedulers are updated.
}

log.Info("schedulers updating notifier is triggered, try to update the scheduler")
Expand Down
13 changes: 13 additions & 0 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,19 @@ func (s *evictLeaderScheduler) ReloadConfig() error {
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
return err
}
// Resume and pause the leader transfer for each store.
for id := range s.conf.StoreIDWithRanges {
if _, ok := newCfg.StoreIDWithRanges[id]; ok {
continue
}
s.conf.cluster.ResumeLeaderTransfer(id)
}
for id := range newCfg.StoreIDWithRanges {
if _, ok := s.conf.StoreIDWithRanges[id]; ok {
continue
}
s.conf.cluster.PauseLeaderTransfer(id)
}
s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
return nil
}
Expand Down
8 changes: 7 additions & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,13 @@ func (h *Handler) AddScheduler(name string, args ...string) error {
return err
}

s, err := schedulers.CreateScheduler(name, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(name, args), c.GetCoordinator().GetSchedulersController().RemoveScheduler)
var removeSchedulerCb func(string) error
if h.s.IsAPIServiceMode() {
removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveSchedulerHandler
} else {
removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveScheduler
}
s, err := schedulers.CreateScheduler(name, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(name, args), removeSchedulerCb)
if err != nil {
return err
}
Expand Down
92 changes: 64 additions & 28 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package scheduling
import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/schedulers"
Expand Down Expand Up @@ -196,25 +198,15 @@ func (suite *serverTestSuite) TestSchedulerSync() {
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
schedulersController := tc.GetPrimaryServer().GetCluster().GetCoordinator().GetSchedulersController()
re.Len(schedulersController.GetSchedulerNames(), 5)
re.Nil(schedulersController.GetScheduler(schedulers.EvictLeaderName))
checkEvictLeaderSchedulerExist(re, schedulersController, false)
// Add a new evict-leader-scheduler through the API server.
api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{
"store_id": 1,
})
// Check if the evict-leader-scheduler is added.
testutil.Eventually(re, func() bool {
return len(schedulersController.GetSchedulerNames()) == 6 &&
schedulersController.GetScheduler(schedulers.EvictLeaderName) != nil
})
handler, ok := schedulersController.GetSchedulerHandlers()[schedulers.EvictLeaderName]
re.True(ok)
h, ok := handler.(interface {
EvictStoreIDs() []uint64
})
re.True(ok)
re.ElementsMatch(h.EvictStoreIDs(), []uint64{1})
// Update the evict-leader-scheduler through the API server.
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1})
// Add a store_id to the evict-leader-scheduler through the API server.
err = suite.pdLeader.GetServer().GetRaftCluster().PutStore(
&metapb.Store{
Id: 2,
Expand All @@ -229,25 +221,69 @@ func (suite *serverTestSuite) TestSchedulerSync() {
api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{
"store_id": 2,
})
var evictStoreIDs []uint64
testutil.Eventually(re, func() bool {
evictStoreIDs = h.EvictStoreIDs()
return len(evictStoreIDs) == 2
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2})
// Delete a store_id from the evict-leader-scheduler through the API server.
api.MustDeleteScheduler(re, suite.backendEndpoints, fmt.Sprintf("%s-%d", schedulers.EvictLeaderName, 1))
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{2})
// Add a store_id to the evict-leader-scheduler through the API server by the scheduler handler.
api.MustCallSchedulerConfigAPI(re, http.MethodPost, suite.backendEndpoints, schedulers.EvictLeaderName, []string{"config"}, map[string]interface{}{
"name": schedulers.EvictLeaderName,
"store_id": 1,
})
re.ElementsMatch(evictStoreIDs, []uint64{1, 2})
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2})
// Delete a store_id from the evict-leader-scheduler through the API server by the scheduler handler.
api.MustCallSchedulerConfigAPI(re, http.MethodDelete, suite.backendEndpoints, schedulers.EvictLeaderName, []string{"delete", "2"}, nil)
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1})
// If the last store is deleted, the scheduler should be removed.
api.MustCallSchedulerConfigAPI(re, http.MethodDelete, suite.backendEndpoints, schedulers.EvictLeaderName, []string{"delete", "1"}, nil)
// Check if the scheduler is removed.
checkEvictLeaderSchedulerExist(re, schedulersController, false)

// Delete the evict-leader-scheduler through the API server by removing the last store_id.
api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{
"store_id": 1,
})
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1})
api.MustDeleteScheduler(re, suite.backendEndpoints, fmt.Sprintf("%s-%d", schedulers.EvictLeaderName, 1))
testutil.Eventually(re, func() bool {
evictStoreIDs = h.EvictStoreIDs()
return len(evictStoreIDs) == 1
checkEvictLeaderSchedulerExist(re, schedulersController, false)

// Delete the evict-leader-scheduler through the API server.
api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{
"store_id": 1,
})
re.ElementsMatch(evictStoreIDs, []uint64{2})
// Remove the evict-leader-scheduler through the API server.
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1})
api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName)
// Check if the scheduler is removed.
checkEvictLeaderSchedulerExist(re, schedulersController, false)

// TODO: test more schedulers.
}

func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) {
testutil.Eventually(re, func() bool {
return len(schedulersController.GetSchedulerNames()) == 5 &&
schedulersController.GetScheduler(schedulers.EvictLeaderName) == nil
if !exist {
return sc.GetScheduler(schedulers.EvictLeaderName) == nil
}
return sc.GetScheduler(schedulers.EvictLeaderName) != nil
})
}

// TODO: test more schedulers.
func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, expected []uint64) {
handler, ok := sc.GetSchedulerHandlers()[schedulers.EvictLeaderName]
re.True(ok)
h, ok := handler.(interface {
EvictStoreIDs() []uint64
})
re.True(ok)
var evictStoreIDs []uint64
testutil.Eventually(re, func() bool {
evictStoreIDs = h.EvictStoreIDs()
return len(evictStoreIDs) == len(expected)
})
re.ElementsMatch(evictStoreIDs, expected)
}
25 changes: 24 additions & 1 deletion tests/server/api/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import (
"fmt"
"io"
"net/http"
"path"

"github.com/stretchr/testify/require"
)

const schedulersPrefix = "/pd/api/v1/schedulers"
const (
schedulersPrefix = "/pd/api/v1/schedulers"
schedulerConfigPrefix = "/pd/api/v1/scheduler-config"
)

// dialClient used to dial http request.
var dialClient = &http.Client{
Expand Down Expand Up @@ -68,3 +72,22 @@ func MustDeleteScheduler(re *require.Assertions, serverAddr, schedulerName strin
re.NoError(err)
re.Equal(http.StatusOK, resp.StatusCode, string(data))
}

// MustCallSchedulerConfigAPI calls a scheduler config with HTTP API with the given args.
func MustCallSchedulerConfigAPI(
re *require.Assertions,
method, serverAddr, schedulerName string, args []string,
input map[string]interface{},
) {
data, err := json.Marshal(input)
re.NoError(err)
args = append([]string{schedulerConfigPrefix, schedulerName}, args...)
httpReq, err := http.NewRequest(method, fmt.Sprintf("%s%s", serverAddr, path.Join(args...)), bytes.NewBuffer(data))
re.NoError(err)
resp, err := dialClient.Do(httpReq)
re.NoError(err)
defer resp.Body.Close()
data, err = io.ReadAll(resp.Body)
re.NoError(err)
re.Equal(http.StatusOK, resp.StatusCode, string(data))
}

0 comments on commit f327342

Please sign in to comment.