diff --git a/client/http/interface.go b/client/http/interface.go index 464a8751dbd..7510ea31fcf 100644 --- a/client/http/interface.go +++ b/client/http/interface.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" "strings" "github.com/pingcap/errors" @@ -49,6 +50,8 @@ type Client interface { GetStore(context.Context, uint64) (*StoreInfo, error) SetStoreLabels(context.Context, int64, map[string]string) error /* Config-related interfaces */ + GetConfig(context.Context) (map[string]interface{}, error) + SetConfig(context.Context, map[string]interface{}, ...float64) error GetScheduleConfig(context.Context) (map[string]interface{}, error) SetScheduleConfig(context.Context, map[string]interface{}) error GetClusterVersion(context.Context) (string, error) @@ -79,6 +82,11 @@ type Client interface { /* Scheduling-related interfaces */ AccelerateSchedule(context.Context, *KeyRange) error AccelerateScheduleInBatch(context.Context, []*KeyRange) error + /* Admin-related interfaces */ + ResetTS(context.Context, uint64, bool) error + ResetBaseAllocID(context.Context, uint64) error + SetSnapshotRecoveringMark(context.Context) error + DeleteSnapshotRecoveringMark(context.Context) error /* Other interfaces */ GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error) GetPDVersion(context.Context) (string, error) @@ -314,6 +322,39 @@ func (c *client) SetStoreLabels(ctx context.Context, storeID int64, storeLabels WithBody(jsonInput)) } +// GetConfig gets the configurations. +func (c *client) GetConfig(ctx context.Context) (map[string]interface{}, error) { + var config map[string]interface{} + err := c.request(ctx, newRequestInfo(). + WithName(getConfigName). + WithURI(Config). + WithMethod(http.MethodGet). + WithResp(&config)) + if err != nil { + return nil, err + } + return config, nil +} + +// SetConfig sets the configurations. ttlSecond is optional. +func (c *client) SetConfig(ctx context.Context, config map[string]interface{}, ttlSecond ...float64) error { + configJSON, err := json.Marshal(config) + if err != nil { + return errors.Trace(err) + } + var uri string + if len(ttlSecond) > 0 { + uri = ConfigWithTTLSeconds(ttlSecond[0]) + } else { + uri = Config + } + return c.request(ctx, newRequestInfo(). + WithName(setConfigName). + WithURI(uri). + WithMethod(http.MethodPost). + WithBody(configJSON)) +} + // GetScheduleConfig gets the schedule configurations. func (c *client) GetScheduleConfig(ctx context.Context) (map[string]interface{}, error) { var config map[string]interface{} @@ -707,6 +748,58 @@ func (c *client) AccelerateScheduleInBatch(ctx context.Context, keyRanges []*Key WithBody(inputJSON)) } +// ResetTS resets the PD's TS. +func (c *client) ResetTS(ctx context.Context, ts uint64, forceUseLarger bool) error { + reqData, err := json.Marshal(struct { + Tso string `json:"tso"` + ForceUseLarger bool `json:"force-use-larger"` + }{ + Tso: strconv.FormatUint(ts, 10), + ForceUseLarger: forceUseLarger, + }) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(resetTSName). + WithURI(ResetTS). + WithMethod(http.MethodPost). + WithBody(reqData)) +} + +// ResetBaseAllocID resets the PD's base alloc ID. +func (c *client) ResetBaseAllocID(ctx context.Context, id uint64) error { + reqData, err := json.Marshal(struct { + ID string `json:"id"` + }{ + ID: strconv.FormatUint(id, 10), + }) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(resetBaseAllocIDName). + WithURI(BaseAllocID). + WithMethod(http.MethodPost). + WithBody(reqData)) +} + +// SetSnapshotRecoveringMark sets the snapshot recovering mark. +func (c *client) SetSnapshotRecoveringMark(ctx context.Context) error { + return c.request(ctx, newRequestInfo(). + WithName(setSnapshotRecoveringMarkName). + WithURI(SnapshotRecoveringMark). + WithMethod(http.MethodPost)) +} + +// DeleteSnapshotRecoveringMark deletes the snapshot recovering mark. +func (c *client) DeleteSnapshotRecoveringMark(ctx context.Context) error { + return c.request(ctx, newRequestInfo(). + WithName(deleteSnapshotRecoveringMarkName). + WithURI(SnapshotRecoveringMark). + WithMethod(http.MethodDelete)) +} + // SetSchedulerDelay sets the delay of given scheduler. func (c *client) SetSchedulerDelay(ctx context.Context, scheduler string, delaySec int64) error { m := map[string]int64{ diff --git a/client/http/request_info.go b/client/http/request_info.go index 611d2f63ac2..404d1e657b5 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -39,6 +39,8 @@ const ( getStoresName = "GetStores" getStoreName = "GetStore" setStoreLabelsName = "SetStoreLabels" + getConfigName = "GetConfig" + setConfigName = "SetConfig" getScheduleConfigName = "GetScheduleConfig" setScheduleConfigName = "SetScheduleConfig" getClusterVersionName = "GetClusterVersion" @@ -69,6 +71,10 @@ const ( getMinResolvedTSByStoresIDsName = "GetMinResolvedTSByStoresIDs" getMicroServiceMembersName = "GetMicroServiceMembers" getPDVersionName = "GetPDVersion" + resetTSName = "ResetTS" + resetBaseAllocIDName = "ResetBaseAllocID" + setSnapshotRecoveringMarkName = "SetSnapshotRecoveringMark" + deleteSnapshotRecoveringMarkName = "DeleteSnapshotRecoveringMark" ) type requestInfo struct { diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 8c9e7722ef7..5cfd8fc25f2 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -32,6 +32,7 @@ import ( pd "github.com/tikv/pd/client/http" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/pkg/core" + sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/testutil" @@ -405,6 +406,33 @@ func (suite *httpClientTestSuite) TestAccelerateSchedule() { re.Len(suspectRegions, 2) } +func (suite *httpClientTestSuite) TestConfig() { + re := suite.Require() + config, err := suite.client.GetConfig(suite.ctx) + re.NoError(err) + re.Equal(float64(4), config["schedule"].(map[string]interface{})["leader-schedule-limit"]) + + newConfig := map[string]interface{}{ + "schedule.leader-schedule-limit": float64(8), + } + err = suite.client.SetConfig(suite.ctx, newConfig) + re.NoError(err) + + config, err = suite.client.GetConfig(suite.ctx) + re.NoError(err) + re.Equal(float64(8), config["schedule"].(map[string]interface{})["leader-schedule-limit"]) + + // Test the config with TTL. + newConfig = map[string]interface{}{ + "schedule.leader-schedule-limit": float64(16), + } + err = suite.client.SetConfig(suite.ctx, newConfig, 5) + re.NoError(err) + resp, err := suite.cluster.GetEtcdClient().Get(suite.ctx, sc.TTLConfigPrefix+"/schedule.leader-schedule-limit") + re.NoError(err) + re.Equal([]byte("16"), resp.Kvs[0].Value) +} + func (suite *httpClientTestSuite) TestScheduleConfig() { re := suite.Require() config, err := suite.client.GetScheduleConfig(suite.ctx) @@ -501,6 +529,18 @@ func (suite *httpClientTestSuite) TestVersion() { re.Equal(versioninfo.PDReleaseVersion, ver) } +func (suite *httpClientTestSuite) TestAdmin() { + re := suite.Require() + err := suite.client.SetSnapshotRecoveringMark(suite.ctx) + re.NoError(err) + err = suite.client.ResetTS(suite.ctx, 123, true) + re.NoError(err) + err = suite.client.ResetBaseAllocID(suite.ctx, 456) + re.NoError(err) + err = suite.client.DeleteSnapshotRecoveringMark(suite.ctx) + re.NoError(err) +} + func (suite *httpClientTestSuite) TestWithBackoffer() { re := suite.Require() // Should return with 404 error without backoffer.