Skip to content

Commit

Permalink
http: add Admin API and Config with TTL API (tikv#7699)
Browse files Browse the repository at this point in the history
ref tikv#7300

Signed-off-by: lance6716 <[email protected]>
  • Loading branch information
lance6716 authored Jan 15, 2024
1 parent 1514418 commit 91bca92
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 0 deletions.
93 changes: 93 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -49,6 +50,8 @@ type Client interface {
GetStore(context.Context, uint64) (*StoreInfo, error)
SetStoreLabels(context.Context, int64, map[string]string) error
/* Config-related interfaces */
GetConfig(context.Context) (map[string]interface{}, error)
SetConfig(context.Context, map[string]interface{}, ...float64) error
GetScheduleConfig(context.Context) (map[string]interface{}, error)
SetScheduleConfig(context.Context, map[string]interface{}) error
GetClusterVersion(context.Context) (string, error)
Expand Down Expand Up @@ -79,6 +82,11 @@ type Client interface {
/* Scheduling-related interfaces */
AccelerateSchedule(context.Context, *KeyRange) error
AccelerateScheduleInBatch(context.Context, []*KeyRange) error
/* Admin-related interfaces */
ResetTS(context.Context, uint64, bool) error
ResetBaseAllocID(context.Context, uint64) error
SetSnapshotRecoveringMark(context.Context) error
DeleteSnapshotRecoveringMark(context.Context) error
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
GetPDVersion(context.Context) (string, error)
Expand Down Expand Up @@ -314,6 +322,39 @@ func (c *client) SetStoreLabels(ctx context.Context, storeID int64, storeLabels
WithBody(jsonInput))
}

// GetConfig gets the configurations.
func (c *client) GetConfig(ctx context.Context) (map[string]interface{}, error) {
var config map[string]interface{}
err := c.request(ctx, newRequestInfo().
WithName(getConfigName).
WithURI(Config).
WithMethod(http.MethodGet).
WithResp(&config))
if err != nil {
return nil, err
}
return config, nil
}

// SetConfig sets the configurations. ttlSecond is optional.
func (c *client) SetConfig(ctx context.Context, config map[string]interface{}, ttlSecond ...float64) error {
configJSON, err := json.Marshal(config)
if err != nil {
return errors.Trace(err)
}
var uri string
if len(ttlSecond) > 0 {
uri = ConfigWithTTLSeconds(ttlSecond[0])
} else {
uri = Config
}
return c.request(ctx, newRequestInfo().
WithName(setConfigName).
WithURI(uri).
WithMethod(http.MethodPost).
WithBody(configJSON))
}

// GetScheduleConfig gets the schedule configurations.
func (c *client) GetScheduleConfig(ctx context.Context) (map[string]interface{}, error) {
var config map[string]interface{}
Expand Down Expand Up @@ -707,6 +748,58 @@ func (c *client) AccelerateScheduleInBatch(ctx context.Context, keyRanges []*Key
WithBody(inputJSON))
}

// ResetTS resets the PD's TS.
func (c *client) ResetTS(ctx context.Context, ts uint64, forceUseLarger bool) error {
reqData, err := json.Marshal(struct {
Tso string `json:"tso"`
ForceUseLarger bool `json:"force-use-larger"`
}{
Tso: strconv.FormatUint(ts, 10),
ForceUseLarger: forceUseLarger,
})
if err != nil {
return errors.Trace(err)
}
return c.request(ctx, newRequestInfo().
WithName(resetTSName).
WithURI(ResetTS).
WithMethod(http.MethodPost).
WithBody(reqData))
}

// ResetBaseAllocID resets the PD's base alloc ID.
func (c *client) ResetBaseAllocID(ctx context.Context, id uint64) error {
reqData, err := json.Marshal(struct {
ID string `json:"id"`
}{
ID: strconv.FormatUint(id, 10),
})
if err != nil {
return errors.Trace(err)
}
return c.request(ctx, newRequestInfo().
WithName(resetBaseAllocIDName).
WithURI(BaseAllocID).
WithMethod(http.MethodPost).
WithBody(reqData))
}

// SetSnapshotRecoveringMark sets the snapshot recovering mark.
func (c *client) SetSnapshotRecoveringMark(ctx context.Context) error {
return c.request(ctx, newRequestInfo().
WithName(setSnapshotRecoveringMarkName).
WithURI(SnapshotRecoveringMark).
WithMethod(http.MethodPost))
}

// DeleteSnapshotRecoveringMark deletes the snapshot recovering mark.
func (c *client) DeleteSnapshotRecoveringMark(ctx context.Context) error {
return c.request(ctx, newRequestInfo().
WithName(deleteSnapshotRecoveringMarkName).
WithURI(SnapshotRecoveringMark).
WithMethod(http.MethodDelete))
}

// SetSchedulerDelay sets the delay of given scheduler.
func (c *client) SetSchedulerDelay(ctx context.Context, scheduler string, delaySec int64) error {
m := map[string]int64{
Expand Down
6 changes: 6 additions & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
getStoresName = "GetStores"
getStoreName = "GetStore"
setStoreLabelsName = "SetStoreLabels"
getConfigName = "GetConfig"
setConfigName = "SetConfig"
getScheduleConfigName = "GetScheduleConfig"
setScheduleConfigName = "SetScheduleConfig"
getClusterVersionName = "GetClusterVersion"
Expand Down Expand Up @@ -69,6 +71,10 @@ const (
getMinResolvedTSByStoresIDsName = "GetMinResolvedTSByStoresIDs"
getMicroServiceMembersName = "GetMicroServiceMembers"
getPDVersionName = "GetPDVersion"
resetTSName = "ResetTS"
resetBaseAllocIDName = "ResetBaseAllocID"
setSnapshotRecoveringMarkName = "SetSnapshotRecoveringMark"
deleteSnapshotRecoveringMarkName = "DeleteSnapshotRecoveringMark"
)

type requestInfo struct {
Expand Down
40 changes: 40 additions & 0 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
pd "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/pkg/core"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/testutil"
Expand Down Expand Up @@ -405,6 +406,33 @@ func (suite *httpClientTestSuite) TestAccelerateSchedule() {
re.Len(suspectRegions, 2)
}

func (suite *httpClientTestSuite) TestConfig() {
re := suite.Require()
config, err := suite.client.GetConfig(suite.ctx)
re.NoError(err)
re.Equal(float64(4), config["schedule"].(map[string]interface{})["leader-schedule-limit"])

newConfig := map[string]interface{}{
"schedule.leader-schedule-limit": float64(8),
}
err = suite.client.SetConfig(suite.ctx, newConfig)
re.NoError(err)

config, err = suite.client.GetConfig(suite.ctx)
re.NoError(err)
re.Equal(float64(8), config["schedule"].(map[string]interface{})["leader-schedule-limit"])

// Test the config with TTL.
newConfig = map[string]interface{}{
"schedule.leader-schedule-limit": float64(16),
}
err = suite.client.SetConfig(suite.ctx, newConfig, 5)
re.NoError(err)
resp, err := suite.cluster.GetEtcdClient().Get(suite.ctx, sc.TTLConfigPrefix+"/schedule.leader-schedule-limit")
re.NoError(err)
re.Equal([]byte("16"), resp.Kvs[0].Value)
}

func (suite *httpClientTestSuite) TestScheduleConfig() {
re := suite.Require()
config, err := suite.client.GetScheduleConfig(suite.ctx)
Expand Down Expand Up @@ -501,6 +529,18 @@ func (suite *httpClientTestSuite) TestVersion() {
re.Equal(versioninfo.PDReleaseVersion, ver)
}

func (suite *httpClientTestSuite) TestAdmin() {
re := suite.Require()
err := suite.client.SetSnapshotRecoveringMark(suite.ctx)
re.NoError(err)
err = suite.client.ResetTS(suite.ctx, 123, true)
re.NoError(err)
err = suite.client.ResetBaseAllocID(suite.ctx, 456)
re.NoError(err)
err = suite.client.DeleteSnapshotRecoveringMark(suite.ctx)
re.NoError(err)
}

func (suite *httpClientTestSuite) TestWithBackoffer() {
re := suite.Require()
// Should return with 404 error without backoffer.
Expand Down

0 comments on commit 91bca92

Please sign in to comment.