diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index e0e5bb9b661..5b80612744b 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -19,7 +19,6 @@ import ( "os" "path/filepath" "strings" - "sync" "sync/atomic" "time" "unsafe" @@ -204,8 +203,6 @@ type PersistConfig struct { schedule atomic.Value replication atomic.Value storeConfig atomic.Value - // Store the respective configurations for different schedulers. - schedulerConfig sync.Map } // NewPersistConfig creates a new PersistConfig instance. @@ -275,24 +272,6 @@ func (o *PersistConfig) GetStoreConfig() *sc.StoreConfig { return o.storeConfig.Load().(*sc.StoreConfig) } -// SetSchedulerConfig sets the scheduler configuration with the given name. -func (o *PersistConfig) SetSchedulerConfig(name, data string) { - o.schedulerConfig.Store(name, data) -} - -// RemoveSchedulerConfig removes the scheduler configuration with the given name. -func (o *PersistConfig) RemoveSchedulerConfig(name string) { - o.schedulerConfig.Delete(name) -} - -// GetSchedulerConfig returns the scheduler configuration with the given name. -func (o *PersistConfig) GetSchedulerConfig(name string) string { - if v, ok := o.schedulerConfig.Load(name); ok { - return v.(string) - } - return "" -} - // GetMaxReplicas returns the max replicas. func (o *PersistConfig) GetMaxReplicas() int { return int(o.GetReplicationConfig().MaxReplicas) diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index e6e204b8631..d65f1a6b553 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -23,6 +23,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/log" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" @@ -50,6 +51,9 @@ type Watcher struct { schedulerConfigWatcher *etcdutil.LoopWatcher *PersistConfig + // Some data, like the scheduler configs, should be loaded into the storage + // to make sure the coordinator could access them correctly. + storage storage.Storage } type persistedConfig struct { @@ -65,6 +69,7 @@ func NewWatcher( etcdClient *clientv3.Client, clusterID uint64, persistConfig *PersistConfig, + storage storage.Storage, ) (*Watcher, error) { ctx, cancel := context.WithCancel(ctx) cw := &Watcher{ @@ -74,6 +79,7 @@ func NewWatcher( schedulerConfigPathPrefix: endpoint.SchedulerConfigPathPrefix(clusterID), etcdClient: etcdClient, PersistConfig: persistConfig, + storage: storage, } err := cw.initializeConfigWatcher() if err != nil { @@ -120,15 +126,15 @@ func (cw *Watcher) initializeConfigWatcher() error { func (cw *Watcher) initializeSchedulerConfigWatcher() error { prefixToTrim := cw.schedulerConfigPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - cw.SetSchedulerConfig( + return cw.storage.SaveScheduleConfig( strings.TrimPrefix(string(kv.Key), prefixToTrim), - string(kv.Value), + kv.Value, ) - return nil } deleteFn := func(kv *mvccpb.KeyValue) error { - cw.RemoveSchedulerConfig(strings.TrimPrefix(string(kv.Key), prefixToTrim)) - return nil + return cw.storage.RemoveScheduleConfig( + strings.TrimPrefix(string(kv.Key), prefixToTrim), + ) } postEventFn := func() error { return nil diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 1e3aea41aa5..8a74abf4b5e 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -432,11 +432,11 @@ func (s *Server) startServer() (err error) { func (s *Server) startCluster(context.Context) error { s.basicCluster = core.NewBasicCluster() + s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) err := s.startWatcher() if err != nil { return err } - s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster) s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh) if err != nil { @@ -458,7 +458,7 @@ func (s *Server) startWatcher() (err error) { if err != nil { return err } - s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.persistConfig) + s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.persistConfig, s.storage) if err != nil { return err } diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 7e21919b214..ab0500d0445 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -444,6 +444,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) { if err = c.schedulers.AddScheduler(s); err != nil { log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) } + } else if err = c.schedulers.AddSchedulerHandler(s); err != nil { + log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) } } @@ -472,6 +474,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) { scheduleCfg.Schedulers[k] = schedulerCfg k++ } + } else if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) } } @@ -507,6 +511,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) { return } log.Info("create scheduler", zap.String("scheduler-name", s.GetName())) + // TODO: handle the plugin in API service mode. if err = c.schedulers.AddScheduler(s); err != nil { log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err)) return diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index b4c425047cd..909acb3494c 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -91,8 +91,10 @@ func ConfigSliceDecoder(name string, args []string) ConfigDecoder { // CreateSchedulerFunc is for creating scheduler. type CreateSchedulerFunc func(opController *operator.Controller, storage endpoint.ConfigStorage, dec ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) -var schedulerMap = make(map[string]CreateSchedulerFunc) -var schedulerArgsToDecoder = make(map[string]ConfigSliceDecoderBuilder) +var ( + schedulerMap = make(map[string]CreateSchedulerFunc) + schedulerArgsToDecoder = make(map[string]ConfigSliceDecoderBuilder) +) // RegisterScheduler binds a scheduler creator. It should be called in init() // func of a package. diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 1c6329fb0b1..c8f07f56678 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -40,22 +40,28 @@ var denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues // Controller is used to manage all schedulers. type Controller struct { sync.RWMutex - wg sync.WaitGroup - ctx context.Context - cluster sche.SchedulerCluster - storage endpoint.ConfigStorage - schedulers map[string]*ScheduleController - opController *operator.Controller + wg sync.WaitGroup + ctx context.Context + cluster sche.SchedulerCluster + storage endpoint.ConfigStorage + // schedulers is used to manage all schedulers, which will only be initialized + // and used in the PD leader service mode now. + schedulers map[string]*ScheduleController + // schedulerHandlers is used to manage the HTTP handlers of schedulers, + // which will only be initialized and used in the API service mode now. + schedulerHandlers map[string]http.Handler + opController *operator.Controller } // NewController creates a scheduler controller. func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller { return &Controller{ - ctx: ctx, - cluster: cluster, - storage: storage, - schedulers: make(map[string]*ScheduleController), - opController: opController, + ctx: ctx, + cluster: cluster, + storage: storage, + schedulers: make(map[string]*ScheduleController), + schedulerHandlers: make(map[string]http.Handler), + opController: opController, } } @@ -86,6 +92,9 @@ func (c *Controller) GetSchedulerNames() []string { func (c *Controller) GetSchedulerHandlers() map[string]http.Handler { c.RLock() defer c.RUnlock() + if len(c.schedulerHandlers) > 0 { + return c.schedulerHandlers + } handlers := make(map[string]http.Handler, len(c.schedulers)) for name, scheduler := range c.schedulers { handlers[name] = scheduler.Scheduler @@ -117,6 +126,50 @@ func (c *Controller) ResetSchedulerMetrics() { schedulerStatusGauge.Reset() } +// AddSchedulerHandler adds the HTTP handler for a scheduler. +func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) error { + c.Lock() + defer c.Unlock() + + name := scheduler.GetName() + if _, ok := c.schedulerHandlers[name]; ok { + return errs.ErrSchedulerExisted.FastGenByArgs() + } + + c.schedulerHandlers[name] = scheduler + c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args) + return nil +} + +// RemoveSchedulerHandler removes the HTTP handler for a scheduler. +func (c *Controller) RemoveSchedulerHandler(name string) error { + c.Lock() + defer c.Unlock() + if c.cluster == nil { + return errs.ErrNotBootstrapped.FastGenByArgs() + } + s, ok := c.schedulerHandlers[name] + if !ok { + return errs.ErrSchedulerNotFound.FastGenByArgs() + } + + conf := c.cluster.GetSchedulerConfig() + conf.RemoveSchedulerCfg(s.(Scheduler).GetType()) + if err := conf.Persist(c.storage); err != nil { + log.Error("the option can not persist scheduler config", errs.ZapError(err)) + return err + } + + if err := c.storage.RemoveScheduleConfig(name); err != nil { + log.Error("can not remove the scheduler config", errs.ZapError(err)) + return err + } + + delete(c.schedulerHandlers, name) + + return nil +} + // AddScheduler adds a scheduler. func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error { c.Lock() @@ -249,8 +302,9 @@ func (c *Controller) IsSchedulerExisted(name string) (bool, error) { if c.cluster == nil { return false, errs.ErrNotBootstrapped.FastGenByArgs() } - _, ok := c.schedulers[name] - if !ok { + _, existScheduler := c.schedulers[name] + _, existHandler := c.schedulerHandlers[name] + if !existScheduler && !existHandler { return false, errs.ErrSchedulerNotFound.FastGenByArgs() } return true, nil diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b16de73f84e..18bf66b8188 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -795,6 +795,16 @@ func (c *RaftCluster) GetSchedulerHandlers() map[string]http.Handler { return c.coordinator.GetSchedulersController().GetSchedulerHandlers() } +// AddSchedulerHandler adds a scheduler handler. +func (c *RaftCluster) AddSchedulerHandler(scheduler schedulers.Scheduler, args ...string) error { + return c.coordinator.GetSchedulersController().AddSchedulerHandler(scheduler, args...) +} + +// RemoveSchedulerHandler removes a scheduler handler. +func (c *RaftCluster) RemoveSchedulerHandler(name string) error { + return c.coordinator.GetSchedulersController().RemoveSchedulerHandler(name) +} + // AddScheduler adds a scheduler. func (c *RaftCluster) AddScheduler(scheduler schedulers.Scheduler, args ...string) error { return c.coordinator.GetSchedulersController().AddScheduler(scheduler, args...) diff --git a/server/handler.go b/server/handler.go index adc1e8ecd31..a90f8e3f04f 100644 --- a/server/handler.go +++ b/server/handler.go @@ -236,19 +236,24 @@ func (h *Handler) AddScheduler(name string, args ...string) error { return err } log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args)) - if !h.s.IsAPIServiceMode() { + if h.s.IsAPIServiceMode() { + if err = c.AddSchedulerHandler(s, args...); err != nil { + log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) + return err + } + log.Info("add scheduler handler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args)) + } else { if err = c.AddScheduler(s, args...); err != nil { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) return err } - } else { - c.GetSchedulerConfig().AddSchedulerCfg(s.GetType(), args) + log.Info("add scheduler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args)) } if err = h.opt.Persist(c.GetStorage()); err != nil { log.Error("can not persist scheduler config", errs.ZapError(err)) return err } - log.Info("add scheduler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args)) + log.Info("persist scheduler config successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args)) return nil } @@ -258,24 +263,18 @@ func (h *Handler) RemoveScheduler(name string) error { if err != nil { return err } - if !h.s.IsAPIServiceMode() { + if h.s.IsAPIServiceMode() { + if err = c.RemoveSchedulerHandler(name); err != nil { + log.Error("can not remove scheduler handler", zap.String("scheduler-name", name), errs.ZapError(err)) + } else { + log.Info("remove scheduler handler successfully", zap.String("scheduler-name", name)) + } + } else { if err = c.RemoveScheduler(name); err != nil { log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err)) } else { log.Info("remove scheduler successfully", zap.String("scheduler-name", name)) } - } else { - conf := c.GetSchedulerConfig() - c.GetSchedulerConfig().RemoveSchedulerCfg(schedulers.FindSchedulerTypeByName(name)) - if err := conf.Persist(c.GetStorage()); err != nil { - log.Error("the option can not persist scheduler config", errs.ZapError(err)) - return err - } - - if err := c.GetStorage().RemoveScheduleConfig(name); err != nil { - log.Error("can not remove the scheduler config", errs.ZapError(err)) - return err - } } return err } diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index 032cb4ad7ae..14c99baa6f5 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -16,16 +16,24 @@ package scheduling import ( "context" + "fmt" "testing" + "time" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/server/api" ) type configTestSuite struct { @@ -77,6 +85,7 @@ func (suite *configTestSuite) TestConfigWatch() { suite.pdLeaderServer.GetEtcdClient(), suite.cluster.GetCluster().GetId(), config.NewPersistConfig(config.NewConfig()), + endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil), ) re.NoError(err) // Check the initial config value. @@ -128,39 +137,92 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { // Make sure the config is persisted before the watcher is created. persistConfig(re, suite.pdLeaderServer) // Create a config watcher. + storage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) watcher, err := config.NewWatcher( suite.ctx, suite.pdLeaderServer.GetEtcdClient(), suite.cluster.GetCluster().GetId(), config.NewPersistConfig(config.NewConfig()), + storage, ) re.NoError(err) // Get all default scheduler names. - var schedulerNames, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllScheduleConfig() - + var namesFromAPIServer, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllScheduleConfig() testutil.Eventually(re, func() bool { - targetCount := len(sc.DefaultSchedulers) - return len(schedulerNames) == targetCount + return len(namesFromAPIServer) == len(sc.DefaultSchedulers) }) // Check all default schedulers' configs. - for _, schedulerName := range schedulerNames { - testutil.Eventually(re, func() bool { - return len(watcher.GetSchedulerConfig(schedulerName)) > 0 - }) - } + var namesFromSchedulingServer []string + testutil.Eventually(re, func() bool { + namesFromSchedulingServer, _, err = storage.LoadAllScheduleConfig() + re.NoError(err) + return len(namesFromSchedulingServer) == len(namesFromAPIServer) + }) + re.Equal(namesFromAPIServer, namesFromSchedulingServer) // Add a new scheduler. - err = suite.pdLeaderServer.GetServer().GetHandler().AddEvictLeaderScheduler(1) - re.NoError(err) + api.MustAddScheduler(re, suite.pdLeaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) // Check the new scheduler's config. testutil.Eventually(re, func() bool { - return len(watcher.GetSchedulerConfig(schedulers.EvictLeaderName)) > 0 + namesFromSchedulingServer, _, err = storage.LoadAllScheduleConfig() + re.NoError(err) + return slice.Contains(namesFromSchedulingServer, schedulers.EvictLeaderName) }) - // Remove the scheduler. - err = suite.pdLeaderServer.GetServer().GetHandler().RemoveScheduler(schedulers.EvictLeaderName) + // Update the scheduler by adding a store. + err = suite.pdLeaderServer.GetServer().GetRaftCluster().PutStore( + &metapb.Store{ + Id: 2, + Address: "mock://2", + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + Version: "4.0.0", + }, + ) re.NoError(err) + api.MustAddScheduler(re, suite.pdLeaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 2, + }) + assertEvictLeaderStoreIDs(re, storage, []uint64{1, 2}) + // Update the scheduler by removing a store. + api.MustDeleteScheduler(re, suite.pdLeaderServer.GetAddr(), fmt.Sprintf("%s-%d", schedulers.EvictLeaderName, 1)) + assertEvictLeaderStoreIDs(re, storage, []uint64{2}) + // Delete the scheduler. + api.MustDeleteScheduler(re, suite.pdLeaderServer.GetAddr(), schedulers.EvictLeaderName) // Check the removed scheduler's config. testutil.Eventually(re, func() bool { - return len(watcher.GetSchedulerConfig(schedulers.EvictLeaderName)) == 0 + namesFromSchedulingServer, _, err = storage.LoadAllScheduleConfig() + re.NoError(err) + return !slice.Contains(namesFromSchedulingServer, schedulers.EvictLeaderName) }) watcher.Close() } + +func assertEvictLeaderStoreIDs( + re *require.Assertions, storage *endpoint.StorageEndpoint, storeIDs []uint64, +) { + var ( + namesFromSchedulingServer, configs []string + err error + evictLeaderCfg struct { + StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` + } + ) + testutil.Eventually(re, func() bool { + namesFromSchedulingServer, configs, err = storage.LoadAllScheduleConfig() + re.NoError(err) + for idx, name := range namesFromSchedulingServer { + if name == schedulers.EvictLeaderName { + err = schedulers.DecodeConfig([]byte(configs[idx]), &evictLeaderCfg) + re.NoError(err) + return len(evictLeaderCfg.StoreIDWithRanges) == len(storeIDs) + } + } + return false + }) + // Validate the updated scheduler's config. + for _, storeID := range storeIDs { + re.Contains(evictLeaderCfg.StoreIDWithRanges, storeID) + } +} diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 61d47d7790c..da86a872045 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package api_test +package api import ( "bytes" @@ -44,13 +44,6 @@ import ( "go.uber.org/goleak" ) -// dialClient used to dial http request. -var dialClient = &http.Client{ - Transport: &http.Transport{ - DisableKeepAlives: true, - }, -} - func TestMain(m *testing.M) { goleak.VerifyTestMain(m, testutil.LeakOptions...) } diff --git a/tests/server/api/testutil.go b/tests/server/api/testutil.go new file mode 100644 index 00000000000..7a33bf39048 --- /dev/null +++ b/tests/server/api/testutil.go @@ -0,0 +1,70 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/stretchr/testify/require" +) + +const schedulersPrefix = "/pd/api/v1/schedulers" + +// dialClient used to dial http request. +var dialClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, +} + +// MustAddScheduler adds a scheduler with HTTP API. +func MustAddScheduler( + re *require.Assertions, serverAddr string, + schedulerName string, args map[string]interface{}, +) { + request := map[string]interface{}{ + "name": schedulerName, + } + for arg, val := range args { + request[arg] = val + } + data, err := json.Marshal(request) + re.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s", serverAddr, schedulersPrefix), bytes.NewBuffer(data)) + re.NoError(err) + // Send request. + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + data, err = io.ReadAll(resp.Body) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode, string(data)) +} + +// MustDeleteScheduler deletes a scheduler with HTTP API. +func MustDeleteScheduler(re *require.Assertions, serverAddr, schedulerName string) { + httpReq, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s%s/%s", serverAddr, schedulersPrefix, schedulerName), nil) + re.NoError(err) + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + data, err := io.ReadAll(resp.Body) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode, string(data)) +}