Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: reset config if the input is invalid #8632

Merged
merged 8 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@
return false, errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) {
conf.Lock()
defer conf.Unlock()
conf.removeStoreLocked(id)

Check failure on line 109 in pkg/schedule/schedulers/evict_leader.go

View workflow job for this annotation

GitHub Actions / statics

Error return value of `conf.removeStoreLocked` is not checked (errcheck)
rleungx marked this conversation as resolved.
Show resolved Hide resolved
}

func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err))
Expand Down Expand Up @@ -408,6 +414,7 @@
batchFloat, ok := input["batch"].(float64)
if ok {
if batchFloat < 1 || batchFloat > 10 {
handler.config.removeStore(id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For invalid config parameters, It's better to validate it, do nothing, and return 4xx + error message to let the users know it, it's not classic and unsafe to reset memory content when meeting invalid inputs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a rollback operation since the memory state has been changed in line 407.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, make sense

Copy link
Member

@okJiang okJiang Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we allow the input to not contain the store id? If this, we can check if !inputHasStoreID in L405, and assume that all subsequent cases are inputHasStoreID by default.
  2. How about puting L407(pauseLeaderTransferIfStoreNotExist) behind all checks over? In this case, we don't need to removeStore for each case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yes, we allow only changing batch size.
  2. range relies on exist, only batch can be moved, but no much difference I think.

Copy link
Member

@okJiang okJiang Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a defer after L411? For now, it is so easy to forget to removeStore, like L445.

defer func () {
    if err != nil {
         ....
    }
}()

handler.rd.JSON(w, http.StatusBadRequest, "batch is invalid, it should be in [1, 10]")
return
}
Expand All @@ -417,6 +424,7 @@
ranges, ok := (input["ranges"]).([]string)
if ok {
if !inputHasStoreID {
handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, errs.ErrSchedulerConfig.FastGenByArgs("id"))
return
}
Expand All @@ -426,6 +434,7 @@

newRanges, err = getKeyRanges(ranges)
if err != nil {
handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,13 @@ func (handler *grantLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R

err := handler.config.buildWithArgs(args)
if err != nil {
_, _ = handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
err = handler.config.persist()
if err != nil {
handler.config.removeStore(id)
_, _ = handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down
15 changes: 10 additions & 5 deletions plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,18 @@

err := handler.config.BuildWithArgs(args)
if err != nil {
handler.config.mu.Lock()
handler.config.cluster.ResumeLeaderTransfer(id)
handler.config.mu.Unlock()
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
err = handler.config.Persist()
if err != nil {
handler.config.mu.Lock()
delete(handler.config.StoreIDWitRanges, id)
handler.config.cluster.ResumeLeaderTransfer(id)
handler.config.mu.Unlock()
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand All @@ -301,22 +308,20 @@

handler.config.mu.Lock()
defer handler.config.mu.Unlock()
_, exists := handler.config.StoreIDWitRanges[id]
ranges, exists := handler.config.StoreIDWitRanges[id]
if !exists {
handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist"))
return
}
delete(handler.config.StoreIDWitRanges, id)
handler.config.cluster.ResumeLeaderTransfer(id)

handler.config.mu.Unlock()
if err := handler.config.Persist(); err != nil {
handler.config.mu.Lock()
handler.config.StoreIDWitRanges[id] = ranges
handler.config.cluster.PauseLeaderTransfer(id)

Check failure on line 321 in plugin/scheduler_example/evict_leader.go

View workflow job for this annotation

GitHub Actions / statics

Error return value of `handler.config.cluster.PauseLeaderTransfer` is not checked (errcheck)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
handler.config.mu.Lock()

var resp any
if len(handler.config.StoreIDWitRanges) == 0 {
resp = noStoreInSchedulerInfo
Expand Down
54 changes: 54 additions & 0 deletions tools/pd-ctl/tests/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,60 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *pdTests.TestC
checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "")
}

func (suite *schedulerTestSuite) TestEvictLeaderScheduler() {
// FIXME: API mode may have the problem
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we fix it in another PR?

suite.env.RunTestInPDMode(suite.checkEvictLeaderScheduler)
}

func (suite *schedulerTestSuite) checkEvictLeaderScheduler(cluster *pdTests.TestCluster) {
re := suite.Require()
pdAddr := cluster.GetConfig().GetClientURL()
cmd := ctl.GetRootCmd()

stores := []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 2,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 3,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 4,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
}
for _, store := range stores {
pdTests.MustPutStore(re, cluster, store)
}

pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"))
output, err := tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
Copy link
Contributor

@lhy1024 lhy1024 Sep 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to check

        testutil.Eventually(re, func() bool { // wait for removed scheduler to be synced to scheduling server.
		output := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, nil)
		return strings.Contains(output, "[404] scheduler not found")
	})

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seem not work

output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
}

func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v any) string {
output, err := tests.ExecuteCommand(cmd, args...)
re.NoError(err)
Expand Down
Loading