Skip to content

Commit

Permalink
ctl: support new interface to acquire stores by state (tikv#7189)
Browse files Browse the repository at this point in the history
close tikv#7188

Signed-off-by: husharp <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] authored Oct 11, 2023
1 parent 873212f commit 779b5be
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 12 deletions.
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "/stores/limit/scene", storesHandler.SetStoreLimitScene, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/stores/limit/scene", storesHandler.GetStoreLimitScene, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/stores/progress", storesHandler.GetStoresProgress, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/stores/check", storesHandler.GetStoresByState, setMethods(http.MethodGet), setAuditBackend(prometheus))

labelsHandler := newLabelsHandler(svr, rd)
registerFunc(clusterRouter, "/labels", labelsHandler.GetLabels, setMethods(http.MethodGet), setAuditBackend(prometheus))
Expand Down
55 changes: 54 additions & 1 deletion server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/server"
Expand Down Expand Up @@ -740,6 +741,7 @@ func (h *storesHandler) GetStoresProgress(w http.ResponseWriter, r *http.Request
// @Success 200 {object} StoresInfo
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /stores [get]
// @Deprecated Better to use /stores/check instead.
func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
stores := rc.GetMetaStores()
Expand All @@ -753,7 +755,7 @@ func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) {
return
}

stores = urlFilter.filter(rc.GetMetaStores())
stores = urlFilter.filter(stores)
for _, s := range stores {
storeID := s.GetId()
store := rc.GetStore(storeID)
Expand All @@ -770,6 +772,57 @@ func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, StoresInfo)
}

// @Tags store
// @Summary Get all stores by states in the cluster.
// @Param state query array true "Specify accepted store states."
// @Produce json
// @Success 200 {object} StoresInfo
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /stores/check [get]
func (h *storesHandler) GetStoresByState(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
stores := rc.GetMetaStores()
StoresInfo := &StoresInfo{
Stores: make([]*StoreInfo, 0, len(stores)),
}

lowerStateName := []string{strings.ToLower(downStateName), strings.ToLower(disconnectedName)}
for _, v := range metapb.StoreState_name {
lowerStateName = append(lowerStateName, strings.ToLower(v))
}

var queryStates []string
if v, ok := r.URL.Query()["state"]; ok {
for _, s := range v {
stateName := strings.ToLower(s)
if stateName != "" && !slice.Contains(lowerStateName, stateName) {
h.rd.JSON(w, http.StatusBadRequest, "unknown StoreState: "+s)
return
} else if stateName != "" {
queryStates = append(queryStates, stateName)
}
}
}

for _, s := range stores {
storeID := s.GetId()
store := rc.GetStore(storeID)
if store == nil {
h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error())
return
}

storeInfo := newStoreInfo(h.GetScheduleConfig(), store)
if queryStates != nil && !slice.Contains(queryStates, strings.ToLower(storeInfo.Store.StateName)) {
continue
}
StoresInfo.Stores = append(StoresInfo.Stores, storeInfo)
}
StoresInfo.Count = len(StoresInfo.Stores)

h.rd.JSON(w, http.StatusOK, StoresInfo)
}

type storeStateFilter struct {
accepts []metapb.StoreState
}
Expand Down
52 changes: 50 additions & 2 deletions server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/core"
tu "github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/pkg/versioninfo"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
)
Expand Down Expand Up @@ -140,17 +141,64 @@ func (suite *storeTestSuite) TestStoresList() {
suite.NoError(err)
checkStoresInfo(re, info.Stores, suite.stores[:3])

url = fmt.Sprintf("%s/stores?state=0", suite.urlPrefix)
url = fmt.Sprintf("%s/stores/check?state=up", suite.urlPrefix)
info = new(StoresInfo)
err = tu.ReadGetJSON(re, testDialClient, url, info)
suite.NoError(err)
checkStoresInfo(re, info.Stores, suite.stores[:2])

url = fmt.Sprintf("%s/stores?state=1", suite.urlPrefix)
url = fmt.Sprintf("%s/stores/check?state=offline", suite.urlPrefix)
info = new(StoresInfo)
err = tu.ReadGetJSON(re, testDialClient, url, info)
suite.NoError(err)
checkStoresInfo(re, info.Stores, suite.stores[2:3])

url = fmt.Sprintf("%s/stores/check?state=tombstone", suite.urlPrefix)
info = new(StoresInfo)
err = tu.ReadGetJSON(re, testDialClient, url, info)
suite.NoError(err)
checkStoresInfo(re, info.Stores, suite.stores[3:])

url = fmt.Sprintf("%s/stores/check?state=tombstone&state=offline", suite.urlPrefix)
info = new(StoresInfo)
err = tu.ReadGetJSON(re, testDialClient, url, info)
suite.NoError(err)
checkStoresInfo(re, info.Stores, suite.stores[2:])

// down store
s := &server.GrpcServer{Server: suite.svr}
store := &metapb.Store{
Id: 100,
Address: fmt.Sprintf("tikv%d", 100),
State: metapb.StoreState_Up,
Version: versioninfo.MinSupportedVersion(versioninfo.Version2_0).String(),
LastHeartbeat: time.Now().UnixNano() - int64(1*time.Hour),
}
_, err = s.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()},
Store: store,
})
re.NoError(err)

url = fmt.Sprintf("%s/stores/check?state=down", suite.urlPrefix)
info = new(StoresInfo)
err = tu.ReadGetJSON(re, testDialClient, url, info)
suite.NoError(err)
checkStoresInfo(re, info.Stores, []*metapb.Store{store})

// disconnect store
store.LastHeartbeat = time.Now().UnixNano() - int64(1*time.Minute)
_, err = s.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()},
Store: store,
})
re.NoError(err)

url = fmt.Sprintf("%s/stores/check?state=disconnected", suite.urlPrefix)
info = new(StoresInfo)
err = tu.ReadGetJSON(re, testDialClient, url, info)
suite.NoError(err)
checkStoresInfo(re, info.Stores, []*metapb.Store{store})
}

func (suite *storeTestSuite) TestStoreGet() {
Expand Down
23 changes: 22 additions & 1 deletion tests/pdctl/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,28 @@ func TestStore(t *testing.T) {
args = []string{"-u", pdAddr, "store", "check", "Invalid_State"}
output, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.Contains(string(output), "Unknown state: Invalid_state")
re.Contains(string(output), "unknown StoreState: Invalid_state")

// Mock a disconnected store.
storeCheck := &metapb.Store{
Id: uint64(2000),
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano() - int64(1*time.Minute),
}
tests.MustPutStore(re, cluster, storeCheck)
args = []string{"-u", pdAddr, "store", "check", "Disconnected"}
output, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.Contains(string(output), "\"id\": 2000,")
// Mock a down store.
storeCheck.Id = uint64(2001)
storeCheck.LastHeartbeat = time.Now().UnixNano() - int64(1*time.Hour)
tests.MustPutStore(re, cluster, storeCheck)
args = []string{"-u", pdAddr, "store", "check", "Down"}
output, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.Contains(string(output), "\"id\": 2001,")

// store cancel-delete <store_id> command
limit = leaderServer.GetRaftCluster().GetStoreLimitByType(1, storelimit.RemovePeer)
Expand Down
10 changes: 2 additions & 8 deletions tools/pd-ctl/pdctl/command/store_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func NewStoreLimitCommand() *cobra.Command {
// NewStoreCheckCommand return a check subcommand of storeCmd
func NewStoreCheckCommand() *cobra.Command {
d := &cobra.Command{
Use: "check [up|offline|tombstone]",
Use: "check [up|offline|tombstone|disconnected|down]",
Short: "Check all the stores with specified status",
Run: storeCheckCommandFunc,
}
Expand Down Expand Up @@ -666,13 +666,7 @@ func storeCheckCommandFunc(cmd *cobra.Command, args []string) {

caser := cases.Title(language.Und)
state := caser.String(strings.ToLower(args[0]))
stateValue, ok := metapb.StoreState_value[state]
if !ok {
cmd.Println("Unknown state: " + state)
return
}

prefix := fmt.Sprintf("%s?state=%d", storesPrefix, stateValue)
prefix := fmt.Sprintf("%s/check?state=%s", storesPrefix, state)
r, err := doRequest(cmd, prefix, http.MethodGet, http.Header{})
if err != nil {
cmd.Printf("Failed to get store: %s\n", err)
Expand Down

0 comments on commit 779b5be

Please sign in to comment.