Skip to content

Commit

Permalink
scheduler: enhance output when scheduler config update (#6927)
Browse files Browse the repository at this point in the history
close #4607

Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 authored Aug 31, 2023
1 parent 13dd27b commit a0b2f3d
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 65 deletions.
7 changes: 4 additions & 3 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,18 @@ func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, interface{})
return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10"
}
conf.persistLocked()
return http.StatusOK, "success"
log.Info("balance-leader-scheduler config is updated", zap.ByteString("old", oldc), zap.ByteString("new", newc))
return http.StatusOK, "Config is updated."
}
m := make(map[string]interface{})
if err := json.Unmarshal(data, &m); err != nil {
return http.StatusInternalServerError, err.Error()
}
ok := reflectutil.FindSameFieldByJSON(conf, m)
if ok {
return http.StatusOK, "no changed"
return http.StatusOK, "Config is the same with origin, so do nothing."
}
return http.StatusBadRequest, "config item not found"
return http.StatusBadRequest, "Config item is not found."
}

func (conf *balanceLeaderSchedulerConfig) validate() bool {
Expand Down
7 changes: 4 additions & 3 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,18 @@ func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, interface{}
return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10"
}
conf.persistLocked()
return http.StatusOK, "success"
log.Info("balance-witness-scheduler config is updated", zap.ByteString("old", oldc), zap.ByteString("new", newc))
return http.StatusOK, "Config is updated."
}
m := make(map[string]interface{})
if err := json.Unmarshal(data, &m); err != nil {
return http.StatusInternalServerError, err.Error()
}
ok := reflectutil.FindSameFieldByJSON(conf, m)
if ok {
return http.StatusOK, "no changed"
return http.StatusOK, "Config is the same with origin, so do nothing."
}
return http.StatusBadRequest, "config item not found"
return http.StatusBadRequest, "Config item is not found."
}

func (conf *balanceWitnessSchedulerConfig) validate() bool {
Expand Down
8 changes: 5 additions & 3 deletions pkg/schedule/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,9 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r *
newc, _ := json.Marshal(conf)
if !bytes.Equal(oldc, newc) {
conf.persistLocked()
rd.Text(w, http.StatusOK, "success")
log.Info("hot-region-scheduler config is updated", zap.String("old", string(oldc)), zap.String("new", string(newc)))
rd.Text(w, http.StatusOK, "Config is updated.")
return
}

m := make(map[string]interface{})
Expand All @@ -431,11 +433,11 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r *
}
ok := reflectutil.FindSameFieldByJSON(conf, m)
if ok {
rd.Text(w, http.StatusOK, "no changed")
rd.Text(w, http.StatusOK, "Config is the same with origin, so do nothing.")
return
}

rd.Text(w, http.StatusBadRequest, "config item not found")
rd.Text(w, http.StatusBadRequest, "Config item is not found.")
}

func (conf *hotRegionSchedulerConfig) persistLocked() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/shuffle_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (conf *shuffleRegionSchedulerConfig) handleSetRoles(w http.ResponseWriter,
rd.Text(w, http.StatusInternalServerError, err.Error())
return
}
rd.Text(w, http.StatusOK, "")
rd.Text(w, http.StatusOK, "Config is updated.")
}

func (conf *shuffleRegionSchedulerConfig) persist() error {
Expand Down
17 changes: 15 additions & 2 deletions pkg/schedule/schedulers/split_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/reflectutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/unrolled/render"
)
Expand Down Expand Up @@ -121,10 +122,22 @@ func (h *splitBucketHandler) UpdateConfig(w http.ResponseWriter, r *http.Request
newc, _ := json.Marshal(h.conf)
if !bytes.Equal(oldc, newc) {
h.conf.persistLocked()
rd.Text(w, http.StatusOK, "success")
rd.Text(w, http.StatusOK, "Config is updated.")
return
}

m := make(map[string]interface{})
if err := json.Unmarshal(data, &m); err != nil {
rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
ok := reflectutil.FindSameFieldByJSON(h.conf, m)
if ok {
rd.Text(w, http.StatusOK, "Config is the same with origin, so do nothing.")
return
}

rd.Text(w, http.StatusBadRequest, "config item not found")
rd.Text(w, http.StatusBadRequest, "Config item is not found.")
}

func newSplitBucketHandler(conf *splitBucketSchedulerConfig) http.Handler {
Expand Down
12 changes: 7 additions & 5 deletions server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package api

import (
"fmt"
"net/http"
"net/url"
"strings"
"time"

Expand All @@ -29,8 +29,6 @@ import (
"github.com/unrolled/render"
)

const schedulerConfigPrefix = "pd/api/v1/scheduler-config"

type schedulerHandler struct {
*server.Handler
svr *server.Server
Expand Down Expand Up @@ -321,8 +319,12 @@ func (h *schedulerHandler) handleErr(w http.ResponseWriter, err error) {
func (h *schedulerHandler) redirectSchedulerDelete(w http.ResponseWriter, name, schedulerName string) {
args := strings.Split(name, "-")
args = args[len(args)-1:]
url := fmt.Sprintf("%s/%s/%s/delete/%s", h.GetAddr(), schedulerConfigPrefix, schedulerName, args[0])
statusCode, err := apiutil.DoDelete(h.svr.GetHTTPClient(), url)
deleteURL, err := url.JoinPath(h.GetAddr(), "pd", server.SchedulerConfigHandlerPath, schedulerName, "delete", args[0])
if err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
statusCode, err := apiutil.DoDelete(h.svr.GetHTTPClient(), deleteURL)
if err != nil {
h.r.JSON(w, statusCode, err.Error())
return
Expand Down
81 changes: 70 additions & 11 deletions server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (suite *scheduleTestSuite) TestAPI() {
// update again
err = tu.CheckPostJSON(testDialClient, updateURL, body,
tu.StatusOK(re),
tu.StringEqual(re, "\"no changed\"\n"))
tu.StringEqual(re, "\"Config is the same with origin, so do nothing.\"\n"))
suite.NoError(err)
// update invalidate batch
dataMap = map[string]interface{}{}
Expand All @@ -173,7 +173,7 @@ func (suite *scheduleTestSuite) TestAPI() {
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, updateURL, body,
tu.Status(re, http.StatusBadRequest),
tu.StringEqual(re, "\"config item not found\"\n"))
tu.StringEqual(re, "\"Config item is not found.\"\n"))
suite.NoError(err)
},
},
Expand All @@ -184,18 +184,29 @@ func (suite *scheduleTestSuite) TestAPI() {
resp := make(map[string]interface{})
listURL := fmt.Sprintf("%s%s%s/%s/list", suite.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp))
expectMap := map[string]float64{
"min-hot-byte-rate": 100,
"min-hot-key-rate": 10,
"max-zombie-rounds": 3,
"max-peer-number": 1000,
expectMap := map[string]interface{}{
"min-hot-byte-rate": 100.0,
"min-hot-key-rate": 10.0,
"min-hot-query-rate": 10.0,
"max-zombie-rounds": 3.0,
"max-peer-number": 1000.0,
"byte-rate-rank-step-ratio": 0.05,
"key-rate-rank-step-ratio": 0.05,
"query-rate-rank-step-ratio": 0.05,
"count-rank-step-ratio": 0.01,
"great-dec-ratio": 0.95,
"minor-dec-ratio": 0.99,
"src-tolerance-ratio": 1.05,
"dst-tolerance-ratio": 1.05,
"split-thresholds": 0.2,
"rank-formula-version": "v2",
"read-priorities": []interface{}{"byte", "key"},
"write-leader-priorities": []interface{}{"key", "byte"},
"write-peer-priorities": []interface{}{"byte", "key"},
"enable-for-tiflash": "true",
"strict-picking-store": "true",
}
re.Equal(len(expectMap), len(resp), "expect %v, got %v", expectMap, resp)
for key := range expectMap {
suite.Equal(expectMap[key], resp[key])
}
Expand All @@ -209,12 +220,60 @@ func (suite *scheduleTestSuite) TestAPI() {
resp = make(map[string]interface{})
suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp))
for key := range expectMap {
suite.Equal(expectMap[key], resp[key])
suite.Equal(expectMap[key], resp[key], "key %s", key)
}
// update again
err = tu.CheckPostJSON(testDialClient, updateURL, body,
tu.StatusOK(re),
tu.StringEqual(re, "no changed"))
tu.StringEqual(re, "Config is the same with origin, so do nothing."))
suite.NoError(err)
// config item not found
dataMap = map[string]interface{}{}
dataMap["error"] = 3
body, err = json.Marshal(dataMap)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, updateURL, body,
tu.Status(re, http.StatusBadRequest),
tu.StringEqual(re, "Config item is not found."))
suite.NoError(err)
},
},
{
name: "split-bucket-scheduler",
createdName: "split-bucket-scheduler",
extraTestFunc: func(name string) {
resp := make(map[string]interface{})
listURL := fmt.Sprintf("%s%s%s/%s/list", suite.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp))
suite.Equal(3.0, resp["degree"])
suite.Equal(0.0, resp["split-limit"])
dataMap := make(map[string]interface{})
dataMap["degree"] = 4
updateURL := fmt.Sprintf("%s%s%s/%s/config", suite.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
body, err := json.Marshal(dataMap)
suite.NoError(err)
suite.NoError(tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re)))
resp = make(map[string]interface{})
suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp))
suite.Equal(4.0, resp["degree"])
// update again
err = tu.CheckPostJSON(testDialClient, updateURL, body,
tu.StatusOK(re),
tu.StringEqual(re, "Config is the same with origin, so do nothing."))
suite.NoError(err)
// empty body
err = tu.CheckPostJSON(testDialClient, updateURL, nil,
tu.Status(re, http.StatusInternalServerError),
tu.StringEqual(re, "\"unexpected end of JSON input\"\n"))
suite.NoError(err)
// config item not found
dataMap = map[string]interface{}{}
dataMap["error"] = 3
body, err = json.Marshal(dataMap)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, updateURL, body,
tu.Status(re, http.StatusBadRequest),
tu.StringEqual(re, "Config item is not found."))
suite.NoError(err)
},
},
Expand Down Expand Up @@ -254,7 +313,7 @@ func (suite *scheduleTestSuite) TestAPI() {
// update again
err = tu.CheckPostJSON(testDialClient, updateURL, body,
tu.StatusOK(re),
tu.StringEqual(re, "\"no changed\"\n"))
tu.StringEqual(re, "\"Config is the same with origin, so do nothing.\"\n"))
suite.NoError(err)
// update invalidate batch
dataMap = map[string]interface{}{}
Expand All @@ -280,7 +339,7 @@ func (suite *scheduleTestSuite) TestAPI() {
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, updateURL, body,
tu.Status(re, http.StatusBadRequest),
tu.StringEqual(re, "\"config item not found\"\n"))
tu.StringEqual(re, "\"Config item is not found.\"\n"))
suite.NoError(err)
},
},
Expand Down
9 changes: 5 additions & 4 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"net/url"
"path"
"strconv"
"strings"
Expand Down Expand Up @@ -76,8 +76,6 @@ var (
ErrPluginNotFound = func(pluginPath string) error {
return errors.Errorf("plugin is not found: %s", pluginPath)
}

schedulerConfigPrefix = "pd/api/v1/scheduler-config"
)

// Handler is a helper to export methods to handle API/RPC requests.
Expand Down Expand Up @@ -1096,7 +1094,10 @@ func (h *Handler) redirectSchedulerUpdate(name string, storeID float64) error {
input := make(map[string]interface{})
input["name"] = name
input["store_id"] = storeID
updateURL := fmt.Sprintf("%s/%s/%s/config", h.GetAddr(), schedulerConfigPrefix, name)
updateURL, err := url.JoinPath(h.GetAddr(), "pd", SchedulerConfigHandlerPath, name, "config")
if err != nil {
return err
}
body, err := json.Marshal(input)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions tools/pd-ctl/pdctl/command/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ func requestJSON(cmd *cobra.Command, method, prefix string, input map[string]int
}

endpoints := getEndpoints(cmd)
var msg []byte
err = tryURLs(cmd, endpoints, func(endpoint string) error {
var msg []byte
var req *http.Request
var resp *http.Response
url := endpoint + "/" + prefix
Expand All @@ -194,11 +194,11 @@ func requestJSON(cmd *cobra.Command, method, prefix string, input map[string]int
return err
}
defer resp.Body.Close()
msg, err = io.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
msg, err = io.ReadAll(resp.Body)
if err != nil {
return err
}
return errors.Errorf("[%d] %s", resp.StatusCode, msg)
}
return nil
Expand All @@ -207,7 +207,7 @@ func requestJSON(cmd *cobra.Command, method, prefix string, input map[string]int
cmd.Printf("Failed! %s\n", err)
return
}
cmd.Println("Success!")
cmd.Printf("Success! %s\n", strings.Trim(string(msg), "\""))
}

func postJSON(cmd *cobra.Command, prefix string, input map[string]interface{}) {
Expand Down
28 changes: 1 addition & 27 deletions tools/pd-ctl/pdctl/command/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/pingcap/errors"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/statistics/utils"
)

var (
Expand Down Expand Up @@ -714,32 +713,7 @@ func postSchedulerConfigCommandFunc(cmd *cobra.Command, schedulerName string, ar
val = value
}
if schedulerName == "balance-hot-region-scheduler" && (key == "read-priorities" || key == "write-leader-priorities" || key == "write-peer-priorities") {
priorities := make([]string, 0)
prioritiesMap := make(map[string]struct{})
for _, priority := range strings.Split(value, ",") {
if priority != utils.BytePriority && priority != utils.KeyPriority && priority != utils.QueryPriority {
cmd.Println(fmt.Sprintf("priority should be one of [%s, %s, %s]",
utils.BytePriority,
utils.QueryPriority,
utils.KeyPriority))
return
}
if priority == utils.QueryPriority && key == "write-peer-priorities" {
cmd.Println("query is not allowed to be set in priorities for write-peer-priorities")
return
}
priorities = append(priorities, priority)
prioritiesMap[priority] = struct{}{}
}
if len(priorities) < 2 {
cmd.Println("priorities should have at least 2 dimensions")
return
}
input[key] = priorities
if len(priorities) != len(prioritiesMap) {
cmd.Println("priorities shouldn't be repeated")
return
}
input[key] = strings.Split(value, ",")
} else {
input[key] = val
}
Expand Down

0 comments on commit a0b2f3d

Please sign in to comment.