Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedulers, handler: fix the evict-leader-scheduler sync #7105

Merged
merged 3 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (c *Cluster) updateScheduler() {
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-notifier:
// This is triggered by the watcher when the schedulers are updated.
}

log.Info("schedulers updating notifier is triggered, try to update the scheduler")
Expand Down
13 changes: 13 additions & 0 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,19 @@ func (s *evictLeaderScheduler) ReloadConfig() error {
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
return err
}
// Resume and pause the leader transfer for each store.
for id := range s.conf.StoreIDWithRanges {
if _, ok := newCfg.StoreIDWithRanges[id]; ok {
continue
}
s.conf.cluster.ResumeLeaderTransfer(id)
}
for id := range newCfg.StoreIDWithRanges {
if _, ok := s.conf.StoreIDWithRanges[id]; ok {
continue
}
s.conf.cluster.PauseLeaderTransfer(id)
}
s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
return nil
}
Expand Down
8 changes: 7 additions & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,13 @@ func (h *Handler) AddScheduler(name string, args ...string) error {
return err
}

s, err := schedulers.CreateScheduler(name, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(name, args), c.GetCoordinator().GetSchedulersController().RemoveScheduler)
var removeSchedulerCb func(string) error
if h.s.IsAPIServiceMode() {
removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveSchedulerHandler
} else {
removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveScheduler
}
s, err := schedulers.CreateScheduler(name, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(name, args), removeSchedulerCb)
if err != nil {
return err
}
Expand Down
92 changes: 64 additions & 28 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package scheduling
import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/schedulers"
Expand Down Expand Up @@ -196,25 +198,15 @@ func (suite *serverTestSuite) TestSchedulerSync() {
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
schedulersController := tc.GetPrimaryServer().GetCluster().GetCoordinator().GetSchedulersController()
re.Len(schedulersController.GetSchedulerNames(), 5)
re.Nil(schedulersController.GetScheduler(schedulers.EvictLeaderName))
checkEvictLeaderSchedulerExist(re, schedulersController, false)
// Add a new evict-leader-scheduler through the API server.
api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{
"store_id": 1,
})
// Check if the evict-leader-scheduler is added.
testutil.Eventually(re, func() bool {
return len(schedulersController.GetSchedulerNames()) == 6 &&
schedulersController.GetScheduler(schedulers.EvictLeaderName) != nil
})
handler, ok := schedulersController.GetSchedulerHandlers()[schedulers.EvictLeaderName]
re.True(ok)
h, ok := handler.(interface {
EvictStoreIDs() []uint64
})
re.True(ok)
re.ElementsMatch(h.EvictStoreIDs(), []uint64{1})
// Update the evict-leader-scheduler through the API server.
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1})
// Add a store_id to the evict-leader-scheduler through the API server.
err = suite.pdLeader.GetServer().GetRaftCluster().PutStore(
&metapb.Store{
Id: 2,
Expand All @@ -229,25 +221,69 @@ func (suite *serverTestSuite) TestSchedulerSync() {
api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{
"store_id": 2,
})
var evictStoreIDs []uint64
testutil.Eventually(re, func() bool {
evictStoreIDs = h.EvictStoreIDs()
return len(evictStoreIDs) == 2
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2})
// Delete a store_id from the evict-leader-scheduler through the API server.
api.MustDeleteScheduler(re, suite.backendEndpoints, fmt.Sprintf("%s-%d", schedulers.EvictLeaderName, 1))
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{2})
// Add a store_id to the evict-leader-scheduler through the API server by the scheduler handler.
api.MustCallSchedulerConfigAPI(re, http.MethodPost, suite.backendEndpoints, schedulers.EvictLeaderName, []string{"config"}, map[string]interface{}{
"name": schedulers.EvictLeaderName,
"store_id": 1,
})
re.ElementsMatch(evictStoreIDs, []uint64{1, 2})
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2})
// Delete a store_id from the evict-leader-scheduler through the API server by the scheduler handler.
api.MustCallSchedulerConfigAPI(re, http.MethodDelete, suite.backendEndpoints, schedulers.EvictLeaderName, []string{"delete", "2"}, nil)
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1})
// If the last store is deleted, the scheduler should be removed.
api.MustCallSchedulerConfigAPI(re, http.MethodDelete, suite.backendEndpoints, schedulers.EvictLeaderName, []string{"delete", "1"}, nil)
// Check if the scheduler is removed.
checkEvictLeaderSchedulerExist(re, schedulersController, false)

// Delete the evict-leader-scheduler through the API server by removing the last store_id.
api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{
"store_id": 1,
})
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1})
api.MustDeleteScheduler(re, suite.backendEndpoints, fmt.Sprintf("%s-%d", schedulers.EvictLeaderName, 1))
testutil.Eventually(re, func() bool {
evictStoreIDs = h.EvictStoreIDs()
return len(evictStoreIDs) == 1
checkEvictLeaderSchedulerExist(re, schedulersController, false)

// Delete the evict-leader-scheduler through the API server.
api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{
"store_id": 1,
})
re.ElementsMatch(evictStoreIDs, []uint64{2})
// Remove the evict-leader-scheduler through the API server.
checkEvictLeaderSchedulerExist(re, schedulersController, true)
checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1})
api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName)
// Check if the scheduler is removed.
checkEvictLeaderSchedulerExist(re, schedulersController, false)

// TODO: test more schedulers.
}

func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) {
testutil.Eventually(re, func() bool {
return len(schedulersController.GetSchedulerNames()) == 5 &&
schedulersController.GetScheduler(schedulers.EvictLeaderName) == nil
if !exist {
return sc.GetScheduler(schedulers.EvictLeaderName) == nil
}
return sc.GetScheduler(schedulers.EvictLeaderName) != nil
})
}

// TODO: test more schedulers.
func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, expected []uint64) {
handler, ok := sc.GetSchedulerHandlers()[schedulers.EvictLeaderName]
re.True(ok)
h, ok := handler.(interface {
EvictStoreIDs() []uint64
})
re.True(ok)
var evictStoreIDs []uint64
testutil.Eventually(re, func() bool {
evictStoreIDs = h.EvictStoreIDs()
return len(evictStoreIDs) == len(expected)
})
re.ElementsMatch(evictStoreIDs, expected)
}
25 changes: 24 additions & 1 deletion tests/server/api/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import (
"fmt"
"io"
"net/http"
"path"

"github.com/stretchr/testify/require"
)

const schedulersPrefix = "/pd/api/v1/schedulers"
const (
schedulersPrefix = "/pd/api/v1/schedulers"
schedulerConfigPrefix = "/pd/api/v1/scheduler-config"
)

// dialClient used to dial http request.
var dialClient = &http.Client{
Expand Down Expand Up @@ -68,3 +72,22 @@ func MustDeleteScheduler(re *require.Assertions, serverAddr, schedulerName strin
re.NoError(err)
re.Equal(http.StatusOK, resp.StatusCode, string(data))
}

// MustCallSchedulerConfigAPI calls a scheduler config with HTTP API with the given args.
func MustCallSchedulerConfigAPI(
re *require.Assertions,
method, serverAddr, schedulerName string, args []string,
input map[string]interface{},
) {
data, err := json.Marshal(input)
re.NoError(err)
args = append([]string{schedulerConfigPrefix, schedulerName}, args...)
httpReq, err := http.NewRequest(method, fmt.Sprintf("%s%s", serverAddr, path.Join(args...)), bytes.NewBuffer(data))
re.NoError(err)
resp, err := dialClient.Do(httpReq)
re.NoError(err)
defer resp.Body.Close()
data, err = io.ReadAll(resp.Body)
re.NoError(err)
re.Equal(http.StatusOK, resp.StatusCode, string(data))
}
Loading