Skip to content

Commit

Permalink
Merge branch 'sche-redirect9' of github.com:lhy1024/pd into sche-redi…
Browse files Browse the repository at this point in the history
…rect9-1
  • Loading branch information
lhy1024 committed Nov 2, 2023
2 parents d64d311 + ab06849 commit 8849eef
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 120 deletions.
37 changes: 0 additions & 37 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,6 @@ func (s *Service) RegisterAdminRouter() {
func (s *Service) RegisterConfigRouter() {
router := s.root.Group("config")
router.GET("", getConfig)
router.GET("/schedule", getScheduleConfig)
router.GET("/replicate", getReplicationConfig)
router.GET("/store", getStoreConfig)
}

// RegisterSchedulersRouter registers the router of the schedulers handler.
Expand Down Expand Up @@ -208,40 +205,6 @@ func getConfig(c *gin.Context) {
c.IndentedJSON(http.StatusOK, cfg)
}

// @Tags config
// @Summary Get schedule config.
// @Produce json
// @Success 200 {object} config.ScheduleConfig
// @Router /config/schedule [get]
func getScheduleConfig(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
cfg := svr.GetScheduleConfig()
cfg.MaxMergeRegionKeys = cfg.GetMaxMergeRegionKeys()
c.IndentedJSON(http.StatusOK, cfg)
}

// @Tags config
// @Summary Get replication config.
// @Produce json
// @Success 200 {object} config.ReplicationConfig
// @Router /config/replicate [get]
func getReplicationConfig(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
cfg := svr.GetReplicationConfig()
c.IndentedJSON(http.StatusOK, cfg)
}

// @Tags config
// @Summary Get store config.
// @Produce json
// @Success 200 {object} config.StoreConfig
// @Router /config/store [get]
func getStoreConfig(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
cfg := svr.GetStoreConfig()
c.IndentedJSON(http.StatusOK, cfg)
}

// @Tags admin
// @Summary Drop all regions from cache.
// @Produce json
Expand Down
16 changes: 0 additions & 16 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/schedule"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand Down Expand Up @@ -528,21 +527,6 @@ func (s *Server) GetConfig() *config.Config {
return cfg
}

// GetScheduleConfig gets the schedule config.
func (s *Server) GetScheduleConfig() *sc.ScheduleConfig {
return s.persistConfig.GetScheduleConfig().Clone()
}

// GetReplicationConfig gets the replication config.
func (s *Server) GetReplicationConfig() *sc.ReplicationConfig {
return s.persistConfig.GetReplicationConfig().Clone()
}

// GetStoreConfig gets the store config.
func (s *Server) GetStoreConfig() *sc.StoreConfig {
return s.persistConfig.GetStoreConfig().Clone()
}

// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *config.Config) *Server {
svr := &Server{
Expand Down
49 changes: 20 additions & 29 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,13 @@ func newConfHandler(svr *server.Server, rd *render.Render) *confHandler {
func (h *confHandler) GetConfig(w http.ResponseWriter, r *http.Request) {
cfg := h.svr.GetConfig()
if h.svr.IsAPIServiceMode() {
b, err := h.GetSchedulingServerConfig("config")
schedulingServerConfig, err := h.GetSchedulingServerConfig()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
var configSchedulingServer config.Config
err = json.Unmarshal(b, &configSchedulingServer)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
cfg.Schedule = configSchedulingServer.Schedule
cfg.Replication = configSchedulingServer.Replication
// TODO: will we support config/store?
cfg.Schedule = schedulingServerConfig.Schedule
cfg.Replication = schedulingServerConfig.Replication
} else {
cfg.Schedule.MaxMergeRegionKeys = cfg.Schedule.GetMaxMergeRegionKeys()
}
Expand Down Expand Up @@ -321,18 +314,13 @@ func getConfigMap(cfg map[string]interface{}, key []string, value interface{}) m
// @Router /config/schedule [get]
func (h *confHandler) GetScheduleConfig(w http.ResponseWriter, r *http.Request) {
if h.svr.IsAPIServiceMode() {
b, err := h.GetSchedulingServerConfig("config/schedule")
cfg, err := h.GetSchedulingServerConfig()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
var cfg sc.ScheduleConfig
err = json.Unmarshal(b, &cfg)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, cfg)
cfg.Schedule.SchedulersPayload = nil
h.rd.JSON(w, http.StatusOK, cfg.Schedule)
return
}
cfg := h.svr.GetScheduleConfig()
Expand Down Expand Up @@ -399,18 +387,12 @@ func (h *confHandler) SetScheduleConfig(w http.ResponseWriter, r *http.Request)
// @Router /config/replicate [get]
func (h *confHandler) GetReplicationConfig(w http.ResponseWriter, r *http.Request) {
if h.svr.IsAPIServiceMode() {
b, err := h.GetSchedulingServerConfig("config/replicate")
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
var cfg sc.ReplicationConfig
err = json.Unmarshal(b, &cfg)
cfg, err := h.GetSchedulingServerConfig()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, cfg)
h.rd.JSON(w, http.StatusOK, cfg.Replication)
return
}
h.rd.JSON(w, http.StatusOK, h.svr.GetReplicationConfig())
Expand Down Expand Up @@ -555,12 +537,12 @@ func (h *confHandler) GetPDServerConfig(w http.ResponseWriter, r *http.Request)
h.rd.JSON(w, http.StatusOK, h.svr.GetPDServerConfig())
}

func (h *confHandler) GetSchedulingServerConfig(path string) ([]byte, error) {
func (h *confHandler) GetSchedulingServerConfig() (*config.Config, error) {
addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), utils.SchedulingServiceName)
if !ok {
return nil, errs.ErrNotFoundSchedulingAddr.FastGenByArgs()
}
url := fmt.Sprintf("%s/scheduling/api/v1/%s", addr, path)
url := fmt.Sprintf("%s/scheduling/api/v1/config", addr)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
Expand All @@ -573,5 +555,14 @@ func (h *confHandler) GetSchedulingServerConfig(path string) ([]byte, error) {
if resp.StatusCode != http.StatusOK {
return nil, errs.ErrSchedulingServer.FastGenByArgs(resp.StatusCode)
}
return io.ReadAll(resp.Body)
b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var schedulingServerConfig config.Config
err = json.Unmarshal(b, &schedulingServerConfig)
if err != nil {
return nil, err
}
return &schedulingServerConfig, nil
}
50 changes: 12 additions & 38 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/tikv/pd/pkg/core"
_ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/storage"
Expand Down Expand Up @@ -271,21 +270,6 @@ func (suite *apiTestSuite) TestConfig() {
suite.Contains(cfg.Schedule.SchedulersPayload, "balance-hot-region-scheduler")
suite.Contains(cfg.Schedule.SchedulersPayload, "balance-witness-scheduler")
suite.Contains(cfg.Schedule.SchedulersPayload, "transfer-witness-leader-scheduler")

var scheduleCfg sc.ScheduleConfig
testutil.ReadGetJSON(re, testDialClient, urlPrefix+"/schedule", &scheduleCfg)
suite.Equal(scheduleCfg.LeaderScheduleLimit, s.GetScheduleConfig().LeaderScheduleLimit)
suite.Equal(scheduleCfg.EnableCrossTableMerge, s.GetScheduleConfig().EnableCrossTableMerge)

var replicationCfg sc.ReplicationConfig
testutil.ReadGetJSON(re, testDialClient, urlPrefix+"/replicate", &replicationCfg)
suite.Equal(replicationCfg.MaxReplicas, s.GetReplicationConfig().MaxReplicas)
suite.Equal(replicationCfg.LocationLabels, s.GetReplicationConfig().LocationLabels)

var storeCfg sc.StoreConfig
testutil.ReadGetJSON(re, testDialClient, urlPrefix+"/store", &storeCfg)
suite.Equal(storeCfg.Coprocessor.RegionMaxKeys, s.GetStoreConfig().Coprocessor.RegionMaxKeys)
suite.Equal(storeCfg.Coprocessor.RegionSplitKeys, s.GetStoreConfig().Coprocessor.RegionSplitKeys)
}
env := tests.NewSchedulingTestEnvironment(suite.T())
env.RunTestInAPIMode(checkConfig)
Expand All @@ -295,38 +279,28 @@ func TestConfigForward(t *testing.T) {
re := require.New(t)
checkConfigForward := func(cluster *tests.TestCluster) {
sche := cluster.GetSchedulingPrimaryServer()
opts := sche.GetPersistConfig()
var cfg map[string]interface{}
addr := cluster.GetLeaderServer().GetAddr()

// Test config
urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", addr)
testutil.ReadGetJSON(re, testDialClient, urlPrefix, &cfg)
re.Equal(cfg["schedule"].(map[string]interface{})["leader-schedule-limit"], float64(sche.GetScheduleConfig().LeaderScheduleLimit))
// Test to change config
sche.GetPersistConfig().GetScheduleConfig().LeaderScheduleLimit = 100
re.Equal(100, int(sche.GetScheduleConfig().LeaderScheduleLimit))
testutil.ReadGetJSON(re, testDialClient, urlPrefix, &cfg)
re.Equal(100., cfg["schedule"].(map[string]interface{})["leader-schedule-limit"])
re.Equal(cfg["schedule"].(map[string]interface{})["leader-schedule-limit"],
float64(opts.GetLeaderScheduleLimit()))

// Test schedule
urlPrefix = fmt.Sprintf("%s/pd/api/v1/config/schedule", addr)
testutil.ReadGetJSON(re, testDialClient, urlPrefix, &cfg)
re.Equal(cfg["leader-schedule-limit"], float64(sche.GetScheduleConfig().LeaderScheduleLimit))
// Test to change config
sche.GetPersistConfig().GetScheduleConfig().LeaderScheduleLimit = 4
re.Equal(4, int(sche.GetScheduleConfig().LeaderScheduleLimit))
testutil.ReadGetJSON(re, testDialClient, urlPrefix, &cfg)
re.Equal(4., cfg["leader-schedule-limit"])
// Test to change config only in scheduling server
// Expect to get new config in scheduling server but not old config in api server

// Test replicate
urlPrefix = fmt.Sprintf("%s/pd/api/v1/config/replicate", addr)
opts.GetScheduleConfig().LeaderScheduleLimit = 100
re.Equal(100, int(opts.GetLeaderScheduleLimit()))
testutil.ReadGetJSON(re, testDialClient, urlPrefix, &cfg)
re.Equal(cfg["max-replicas"], float64(sche.GetReplicationConfig().MaxReplicas))
// Test to change config
sche.GetPersistConfig().GetReplicationConfig().MaxReplicas = 5
re.Equal(5, int(sche.GetReplicationConfig().MaxReplicas))
re.Equal(100., cfg["schedule"].(map[string]interface{})["leader-schedule-limit"])

opts.GetReplicationConfig().MaxReplicas = 5
re.Equal(5, int(opts.GetReplicationConfig().MaxReplicas))
testutil.ReadGetJSON(re, testDialClient, urlPrefix, &cfg)
re.Equal(5., cfg["max-replicas"])
re.Equal(5., cfg["replication"].(map[string]interface{})["max-replicas"])
}
env := tests.NewSchedulingTestEnvironment(t)
env.RunTestInAPIMode(checkConfigForward)
Expand Down

0 comments on commit 8849eef

Please sign in to comment.