From e26a4f7292280345d5813d7a1990a77a8785c0d6 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 13 Dec 2023 17:46:48 +0800 Subject: [PATCH] client/http: add more API for lightning's usage, and don't use body io.Reader (#7534) ref tikv/pd#7300 Signed-off-by: lance6716 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/http/client.go | 124 ++++++++++++------ tests/integrations/client/http_client_test.go | 11 ++ 2 files changed, 95 insertions(+), 40 deletions(-) diff --git a/client/http/client.go b/client/http/client.go index 958c52489fb..d74c77571d6 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -55,6 +55,7 @@ type Client interface { GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error) GetRegionStatusByKeyRange(context.Context, *KeyRange, bool) (*RegionStats, error) GetStores(context.Context) (*StoresInfo, error) + GetStore(context.Context, uint64) (*StoreInfo, error) SetStoreLabels(context.Context, int64, map[string]string) error GetMembers(context.Context) (*MembersInfo, error) GetLeader(context.Context) (*pdpb.Member, error) @@ -62,9 +63,11 @@ type Client interface { /* Config-related interfaces */ GetScheduleConfig(context.Context) (map[string]interface{}, error) SetScheduleConfig(context.Context, map[string]interface{}) error + GetClusterVersion(context.Context) (string, error) /* Scheduler-related interfaces */ GetSchedulers(context.Context) ([]string, error) CreateScheduler(ctx context.Context, name string, storeID uint64) error + SetSchedulerDelay(context.Context, string, int64) error /* Rule-related interfaces */ GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error) GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error) @@ -247,7 +250,7 @@ func WithAllowFollowerHandle() HeaderOption { func (c *client) requestWithRetry( ctx context.Context, name, uri, method string, - body io.Reader, res interface{}, + body []byte, res interface{}, headerOpts ...HeaderOption, ) error { var ( @@ -269,7 +272,7 @@ func (c *client) requestWithRetry( func (c *client) request( ctx context.Context, name, url, method string, - body io.Reader, res interface{}, + body []byte, res interface{}, headerOpts ...HeaderOption, ) error { logFields := []zap.Field{ @@ -279,7 +282,7 @@ func (c *client) request( zap.String("caller-id", c.callerID), } log.Debug("[pd] request the http url", logFields...) - req, err := http.NewRequestWithContext(ctx, method, url, body) + req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(body)) if err != nil { log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...) return errors.Trace(err) @@ -341,7 +344,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInf var region RegionInfo err := c.requestWithRetry(ctx, "GetRegionByID", RegionByID(regionID), - http.MethodGet, http.NoBody, ®ion) + http.MethodGet, nil, ®ion) if err != nil { return nil, err } @@ -353,7 +356,7 @@ func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, e var region RegionInfo err := c.requestWithRetry(ctx, "GetRegionByKey", RegionByKey(key), - http.MethodGet, http.NoBody, ®ion) + http.MethodGet, nil, ®ion) if err != nil { return nil, err } @@ -365,7 +368,7 @@ func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) { var regions RegionsInfo err := c.requestWithRetry(ctx, "GetRegions", Regions, - http.MethodGet, http.NoBody, ®ions) + http.MethodGet, nil, ®ions) if err != nil { return nil, err } @@ -378,7 +381,7 @@ func (c *client) GetRegionsByKeyRange(ctx context.Context, keyRange *KeyRange, l var regions RegionsInfo err := c.requestWithRetry(ctx, "GetRegionsByKeyRange", RegionsByKeyRange(keyRange, limit), - http.MethodGet, http.NoBody, ®ions) + http.MethodGet, nil, ®ions) if err != nil { return nil, err } @@ -390,7 +393,7 @@ func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*Regi var regions RegionsInfo err := c.requestWithRetry(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), - http.MethodGet, http.NoBody, ®ions) + http.MethodGet, nil, ®ions) if err != nil { return nil, err } @@ -403,7 +406,7 @@ func (c *client) GetRegionsReplicatedStateByKeyRange(ctx context.Context, keyRan var state string err := c.requestWithRetry(ctx, "GetRegionsReplicatedStateByKeyRange", RegionsReplicatedByKeyRange(keyRange), - http.MethodGet, http.NoBody, &state) + http.MethodGet, nil, &state) if err != nil { return "", err } @@ -415,7 +418,7 @@ func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, er var hotReadRegions StoreHotPeersInfos err := c.requestWithRetry(ctx, "GetHotReadRegions", HotRead, - http.MethodGet, http.NoBody, &hotReadRegions) + http.MethodGet, nil, &hotReadRegions) if err != nil { return nil, err } @@ -427,7 +430,7 @@ func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, e var hotWriteRegions StoreHotPeersInfos err := c.requestWithRetry(ctx, "GetHotWriteRegions", HotWrite, - http.MethodGet, http.NoBody, &hotWriteRegions) + http.MethodGet, nil, &hotWriteRegions) if err != nil { return nil, err } @@ -443,7 +446,7 @@ func (c *client) GetHistoryHotRegions(ctx context.Context, req *HistoryHotRegion var historyHotRegions HistoryHotRegions err = c.requestWithRetry(ctx, "GetHistoryHotRegions", HotHistory, - http.MethodGet, bytes.NewBuffer(reqJSON), &historyHotRegions, + http.MethodGet, reqJSON, &historyHotRegions, WithAllowFollowerHandle()) if err != nil { return nil, err @@ -458,7 +461,7 @@ func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRan var regionStats RegionStats err := c.requestWithRetry(ctx, "GetRegionStatusByKeyRange", RegionStatsByKeyRange(keyRange, onlyCount), - http.MethodGet, http.NoBody, ®ionStats, + http.MethodGet, nil, ®ionStats, ) if err != nil { return nil, err @@ -473,14 +476,14 @@ func (c *client) SetStoreLabels(ctx context.Context, storeID int64, storeLabels return errors.Trace(err) } return c.requestWithRetry(ctx, "SetStoreLabel", LabelByStoreID(storeID), - http.MethodPost, bytes.NewBuffer(jsonInput), nil) + http.MethodPost, jsonInput, nil) } func (c *client) GetMembers(ctx context.Context) (*MembersInfo, error) { var members MembersInfo err := c.requestWithRetry(ctx, "GetMembers", membersPrefix, - http.MethodGet, http.NoBody, &members) + http.MethodGet, nil, &members) if err != nil { return nil, err } @@ -491,7 +494,7 @@ func (c *client) GetMembers(ctx context.Context) (*MembersInfo, error) { func (c *client) GetLeader(ctx context.Context) (*pdpb.Member, error) { var leader pdpb.Member err := c.requestWithRetry(ctx, "GetLeader", leaderPrefix, - http.MethodGet, http.NoBody, &leader) + http.MethodGet, nil, &leader) if err != nil { return nil, err } @@ -501,7 +504,7 @@ func (c *client) GetLeader(ctx context.Context) (*pdpb.Member, error) { // TransferLeader transfers the PD leader. func (c *client) TransferLeader(ctx context.Context, newLeader string) error { return c.requestWithRetry(ctx, "TransferLeader", TransferLeaderByID(newLeader), - http.MethodPost, http.NoBody, nil) + http.MethodPost, nil, nil) } // GetScheduleConfig gets the schedule configurations. @@ -509,7 +512,7 @@ func (c *client) GetScheduleConfig(ctx context.Context) (map[string]interface{}, var config map[string]interface{} err := c.requestWithRetry(ctx, "GetScheduleConfig", ScheduleConfig, - http.MethodGet, http.NoBody, &config) + http.MethodGet, nil, &config) if err != nil { return nil, err } @@ -524,7 +527,7 @@ func (c *client) SetScheduleConfig(ctx context.Context, config map[string]interf } return c.requestWithRetry(ctx, "SetScheduleConfig", ScheduleConfig, - http.MethodPost, bytes.NewBuffer(configJSON), nil) + http.MethodPost, configJSON, nil) } // GetStores gets the stores info. @@ -532,19 +535,43 @@ func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) { var stores StoresInfo err := c.requestWithRetry(ctx, "GetStores", Stores, - http.MethodGet, http.NoBody, &stores) + http.MethodGet, nil, &stores) if err != nil { return nil, err } return &stores, nil } +// GetStore gets the store info by ID. +func (c *client) GetStore(ctx context.Context, storeID uint64) (*StoreInfo, error) { + var store StoreInfo + err := c.requestWithRetry(ctx, + "GetStore", StoreByID(storeID), + http.MethodGet, nil, &store) + if err != nil { + return nil, err + } + return &store, nil +} + +// GetClusterVersion gets the cluster version. +func (c *client) GetClusterVersion(ctx context.Context) (string, error) { + var version string + err := c.requestWithRetry(ctx, + "GetClusterVersion", ClusterVersion, + http.MethodGet, nil, &version) + if err != nil { + return "", err + } + return version, nil +} + // GetAllPlacementRuleBundles gets all placement rules bundles. func (c *client) GetAllPlacementRuleBundles(ctx context.Context) ([]*GroupBundle, error) { var bundles []*GroupBundle err := c.requestWithRetry(ctx, "GetPlacementRuleBundle", PlacementRuleBundle, - http.MethodGet, http.NoBody, &bundles) + http.MethodGet, nil, &bundles) if err != nil { return nil, err } @@ -556,7 +583,7 @@ func (c *client) GetPlacementRuleBundleByGroup(ctx context.Context, group string var bundle GroupBundle err := c.requestWithRetry(ctx, "GetPlacementRuleBundleByGroup", PlacementRuleBundleByGroup(group), - http.MethodGet, http.NoBody, &bundle) + http.MethodGet, nil, &bundle) if err != nil { return nil, err } @@ -568,7 +595,7 @@ func (c *client) GetPlacementRulesByGroup(ctx context.Context, group string) ([] var rules []*Rule err := c.requestWithRetry(ctx, "GetPlacementRulesByGroup", PlacementRulesByGroup(group), - http.MethodGet, http.NoBody, &rules) + http.MethodGet, nil, &rules) if err != nil { return nil, err } @@ -583,7 +610,7 @@ func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error { } return c.requestWithRetry(ctx, "SetPlacementRule", PlacementRule, - http.MethodPost, bytes.NewBuffer(ruleJSON), nil) + http.MethodPost, ruleJSON, nil) } // SetPlacementRuleInBatch sets the placement rules in batch. @@ -594,7 +621,7 @@ func (c *client) SetPlacementRuleInBatch(ctx context.Context, ruleOps []*RuleOp) } return c.requestWithRetry(ctx, "SetPlacementRuleInBatch", PlacementRulesInBatch, - http.MethodPost, bytes.NewBuffer(ruleOpsJSON), nil) + http.MethodPost, ruleOpsJSON, nil) } // SetPlacementRuleBundles sets the placement rule bundles. @@ -606,14 +633,14 @@ func (c *client) SetPlacementRuleBundles(ctx context.Context, bundles []*GroupBu } return c.requestWithRetry(ctx, "SetPlacementRuleBundles", PlacementRuleBundleWithPartialParameter(partial), - http.MethodPost, bytes.NewBuffer(bundlesJSON), nil) + http.MethodPost, bundlesJSON, nil) } // DeletePlacementRule deletes the placement rule. func (c *client) DeletePlacementRule(ctx context.Context, group, id string) error { return c.requestWithRetry(ctx, "DeletePlacementRule", PlacementRuleByGroupAndID(group, id), - http.MethodDelete, http.NoBody, nil) + http.MethodDelete, nil, nil) } // GetAllPlacementRuleGroups gets all placement rule groups. @@ -621,7 +648,7 @@ func (c *client) GetAllPlacementRuleGroups(ctx context.Context) ([]*RuleGroup, e var ruleGroups []*RuleGroup err := c.requestWithRetry(ctx, "GetAllPlacementRuleGroups", placementRuleGroups, - http.MethodGet, http.NoBody, &ruleGroups) + http.MethodGet, nil, &ruleGroups) if err != nil { return nil, err } @@ -633,7 +660,7 @@ func (c *client) GetPlacementRuleGroupByID(ctx context.Context, id string) (*Rul var ruleGroup RuleGroup err := c.requestWithRetry(ctx, "GetPlacementRuleGroupByID", PlacementRuleGroupByID(id), - http.MethodGet, http.NoBody, &ruleGroup) + http.MethodGet, nil, &ruleGroup) if err != nil { return nil, err } @@ -648,14 +675,14 @@ func (c *client) SetPlacementRuleGroup(ctx context.Context, ruleGroup *RuleGroup } return c.requestWithRetry(ctx, "SetPlacementRuleGroup", placementRuleGroup, - http.MethodPost, bytes.NewBuffer(ruleGroupJSON), nil) + http.MethodPost, ruleGroupJSON, nil) } // DeletePlacementRuleGroupByID deletes the placement rule group by ID. func (c *client) DeletePlacementRuleGroupByID(ctx context.Context, id string) error { return c.requestWithRetry(ctx, "DeletePlacementRuleGroupByID", PlacementRuleGroupByID(id), - http.MethodDelete, http.NoBody, nil) + http.MethodDelete, nil, nil) } // GetAllRegionLabelRules gets all region label rules. @@ -663,7 +690,7 @@ func (c *client) GetAllRegionLabelRules(ctx context.Context) ([]*LabelRule, erro var labelRules []*LabelRule err := c.requestWithRetry(ctx, "GetAllRegionLabelRules", RegionLabelRules, - http.MethodGet, http.NoBody, &labelRules) + http.MethodGet, nil, &labelRules) if err != nil { return nil, err } @@ -679,7 +706,7 @@ func (c *client) GetRegionLabelRulesByIDs(ctx context.Context, ruleIDs []string) var labelRules []*LabelRule err = c.requestWithRetry(ctx, "GetRegionLabelRulesByIDs", RegionLabelRulesByIDs, - http.MethodGet, bytes.NewBuffer(idsJSON), &labelRules) + http.MethodGet, idsJSON, &labelRules) if err != nil { return nil, err } @@ -694,7 +721,7 @@ func (c *client) SetRegionLabelRule(ctx context.Context, labelRule *LabelRule) e } return c.requestWithRetry(ctx, "SetRegionLabelRule", RegionLabelRule, - http.MethodPost, bytes.NewBuffer(labelRuleJSON), nil) + http.MethodPost, labelRuleJSON, nil) } // PatchRegionLabelRules patches the region label rules. @@ -705,14 +732,14 @@ func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *Labe } return c.requestWithRetry(ctx, "PatchRegionLabelRules", RegionLabelRules, - http.MethodPatch, bytes.NewBuffer(labelRulePatchJSON), nil) + http.MethodPatch, labelRulePatchJSON, nil) } // GetSchedulers gets the schedulers from PD cluster. func (c *client) GetSchedulers(ctx context.Context) ([]string, error) { var schedulers []string err := c.requestWithRetry(ctx, "GetSchedulers", Schedulers, - http.MethodGet, http.NoBody, &schedulers) + http.MethodGet, nil, &schedulers) if err != nil { return nil, err } @@ -730,7 +757,7 @@ func (c *client) CreateScheduler(ctx context.Context, name string, storeID uint6 } return c.requestWithRetry(ctx, "CreateScheduler", Schedulers, - http.MethodPost, bytes.NewBuffer(inputJSON), nil) + http.MethodPost, inputJSON, nil) } // AccelerateSchedule accelerates the scheduling of the regions within the given key range. @@ -746,7 +773,7 @@ func (c *client) AccelerateSchedule(ctx context.Context, keyRange *KeyRange) err } return c.requestWithRetry(ctx, "AccelerateSchedule", AccelerateSchedule, - http.MethodPost, bytes.NewBuffer(inputJSON), nil) + http.MethodPost, inputJSON, nil) } // AccelerateScheduleInBatch accelerates the scheduling of the regions within the given key ranges in batch. @@ -766,10 +793,27 @@ func (c *client) AccelerateScheduleInBatch(ctx context.Context, keyRanges []*Key } return c.requestWithRetry(ctx, "AccelerateScheduleInBatch", AccelerateScheduleInBatch, - http.MethodPost, bytes.NewBuffer(inputJSON), nil) + http.MethodPost, inputJSON, nil) +} + +// SetSchedulerDelay sets the delay of given scheduler. +func (c *client) SetSchedulerDelay(ctx context.Context, scheduler string, delaySec int64) error { + m := map[string]int64{ + "delay": delaySec, + } + inputJSON, err := json.Marshal(m) + if err != nil { + return errors.Trace(err) + } + return c.requestWithRetry(ctx, + "SetSchedulerDelay", SchedulerByName(scheduler), + http.MethodPost, inputJSON, nil) } // GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs. +// - When storeIDs has zero length, it will return (cluster-level's min_resolved_ts, nil, nil) when no error. +// - When storeIDs is {"cluster"}, it will return (cluster-level's min_resolved_ts, stores_min_resolved_ts, nil) when no error. +// - When storeID is specified to ID lists, it will return (min_resolved_ts of given stores, stores_min_resolved_ts, nil) when no error. func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) { uri := MinResolvedTSPrefix // scope is an optional parameter, it can be `cluster` or specified store IDs. @@ -791,7 +835,7 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin }{} err := c.requestWithRetry(ctx, "GetMinResolvedTSByStoresIDs", uri, - http.MethodGet, http.NoBody, &resp) + http.MethodGet, nil, &resp) if err != nil { return 0, nil, err } diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 476b4d2f541..7c8f66f4826 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -134,6 +134,13 @@ func (suite *httpClientTestSuite) TestMeta() { re.NoError(err) re.Equal(1, store.Count) re.Len(store.Stores, 1) + storeID := uint64(store.Stores[0].Store.ID) // TODO: why type is different? + store2, err := suite.client.GetStore(suite.ctx, storeID) + re.NoError(err) + re.EqualValues(storeID, store2.Store.ID) + version, err := suite.client.GetClusterVersion(suite.ctx) + re.NoError(err) + re.Equal("0.0.0", version) } func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() { @@ -396,6 +403,10 @@ func (suite *httpClientTestSuite) TestSchedulers() { schedulers, err = suite.client.GetSchedulers(suite.ctx) re.NoError(err) re.Len(schedulers, 1) + err = suite.client.SetSchedulerDelay(suite.ctx, "evict-leader-scheduler", 100) + re.NoError(err) + err = suite.client.SetSchedulerDelay(suite.ctx, "not-exist", 100) + re.ErrorContains(err, "500 Internal Server Error") // TODO: should return friendly error message } func (suite *httpClientTestSuite) TestSetStoreLabels() {