diff --git a/server/api/router.go b/server/api/router.go index f47f0e3ebf2..93811e264f1 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -226,6 +226,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/stores/limit/scene", storesHandler.SetStoreLimitScene, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/stores/limit/scene", storesHandler.GetStoreLimitScene, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/stores/progress", storesHandler.GetStoresProgress, setMethods(http.MethodGet), setAuditBackend(prometheus)) + registerFunc(clusterRouter, "/stores/check", storesHandler.GetStoresByState, setMethods(http.MethodGet), setAuditBackend(prometheus)) labelsHandler := newLabelsHandler(svr, rd) registerFunc(clusterRouter, "/labels", labelsHandler.GetLabels, setMethods(http.MethodGet), setAuditBackend(prometheus)) diff --git a/server/api/store.go b/server/api/store.go index 7c820a3befa..a44850d35cc 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -31,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server" @@ -740,6 +741,7 @@ func (h *storesHandler) GetStoresProgress(w http.ResponseWriter, r *http.Request // @Success 200 {object} StoresInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /stores [get] +// @Deprecated Better to use /stores/check instead. func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) stores := rc.GetMetaStores() @@ -753,7 +755,7 @@ func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) { return } - stores = urlFilter.filter(rc.GetMetaStores()) + stores = urlFilter.filter(stores) for _, s := range stores { storeID := s.GetId() store := rc.GetStore(storeID) @@ -770,6 +772,57 @@ func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusOK, StoresInfo) } +// @Tags store +// @Summary Get all stores by states in the cluster. +// @Param state query array true "Specify accepted store states." +// @Produce json +// @Success 200 {object} StoresInfo +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /stores/check [get] +func (h *storesHandler) GetStoresByState(w http.ResponseWriter, r *http.Request) { + rc := getCluster(r) + stores := rc.GetMetaStores() + StoresInfo := &StoresInfo{ + Stores: make([]*StoreInfo, 0, len(stores)), + } + + lowerStateName := []string{strings.ToLower(downStateName), strings.ToLower(disconnectedName)} + for _, v := range metapb.StoreState_name { + lowerStateName = append(lowerStateName, strings.ToLower(v)) + } + + var queryStates []string + if v, ok := r.URL.Query()["state"]; ok { + for _, s := range v { + stateName := strings.ToLower(s) + if stateName != "" && !slice.Contains(lowerStateName, stateName) { + h.rd.JSON(w, http.StatusBadRequest, "unknown StoreState: "+s) + return + } else if stateName != "" { + queryStates = append(queryStates, stateName) + } + } + } + + for _, s := range stores { + storeID := s.GetId() + store := rc.GetStore(storeID) + if store == nil { + h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error()) + return + } + + storeInfo := newStoreInfo(h.GetScheduleConfig(), store) + if queryStates != nil && !slice.Contains(queryStates, strings.ToLower(storeInfo.Store.StateName)) { + continue + } + StoresInfo.Stores = append(StoresInfo.Stores, storeInfo) + } + StoresInfo.Count = len(StoresInfo.Stores) + + h.rd.JSON(w, http.StatusOK, StoresInfo) +} + type storeStateFilter struct { accepts []metapb.StoreState } diff --git a/server/api/store_test.go b/server/api/store_test.go index 4bcdf1953a5..2b3a8dee9bb 100644 --- a/server/api/store_test.go +++ b/server/api/store_test.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/core" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" + "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" ) @@ -140,17 +141,64 @@ func (suite *storeTestSuite) TestStoresList() { suite.NoError(err) checkStoresInfo(re, info.Stores, suite.stores[:3]) - url = fmt.Sprintf("%s/stores?state=0", suite.urlPrefix) + url = fmt.Sprintf("%s/stores/check?state=up", suite.urlPrefix) info = new(StoresInfo) err = tu.ReadGetJSON(re, testDialClient, url, info) suite.NoError(err) checkStoresInfo(re, info.Stores, suite.stores[:2]) - url = fmt.Sprintf("%s/stores?state=1", suite.urlPrefix) + url = fmt.Sprintf("%s/stores/check?state=offline", suite.urlPrefix) info = new(StoresInfo) err = tu.ReadGetJSON(re, testDialClient, url, info) suite.NoError(err) checkStoresInfo(re, info.Stores, suite.stores[2:3]) + + url = fmt.Sprintf("%s/stores/check?state=tombstone", suite.urlPrefix) + info = new(StoresInfo) + err = tu.ReadGetJSON(re, testDialClient, url, info) + suite.NoError(err) + checkStoresInfo(re, info.Stores, suite.stores[3:]) + + url = fmt.Sprintf("%s/stores/check?state=tombstone&state=offline", suite.urlPrefix) + info = new(StoresInfo) + err = tu.ReadGetJSON(re, testDialClient, url, info) + suite.NoError(err) + checkStoresInfo(re, info.Stores, suite.stores[2:]) + + // down store + s := &server.GrpcServer{Server: suite.svr} + store := &metapb.Store{ + Id: 100, + Address: fmt.Sprintf("tikv%d", 100), + State: metapb.StoreState_Up, + Version: versioninfo.MinSupportedVersion(versioninfo.Version2_0).String(), + LastHeartbeat: time.Now().UnixNano() - int64(1*time.Hour), + } + _, err = s.PutStore(context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()}, + Store: store, + }) + re.NoError(err) + + url = fmt.Sprintf("%s/stores/check?state=down", suite.urlPrefix) + info = new(StoresInfo) + err = tu.ReadGetJSON(re, testDialClient, url, info) + suite.NoError(err) + checkStoresInfo(re, info.Stores, []*metapb.Store{store}) + + // disconnect store + store.LastHeartbeat = time.Now().UnixNano() - int64(1*time.Minute) + _, err = s.PutStore(context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()}, + Store: store, + }) + re.NoError(err) + + url = fmt.Sprintf("%s/stores/check?state=disconnected", suite.urlPrefix) + info = new(StoresInfo) + err = tu.ReadGetJSON(re, testDialClient, url, info) + suite.NoError(err) + checkStoresInfo(re, info.Stores, []*metapb.Store{store}) } func (suite *storeTestSuite) TestStoreGet() { diff --git a/tests/pdctl/store/store_test.go b/tests/pdctl/store/store_test.go index 13c7350bb6f..3400841b5ea 100644 --- a/tests/pdctl/store/store_test.go +++ b/tests/pdctl/store/store_test.go @@ -327,7 +327,28 @@ func TestStore(t *testing.T) { args = []string{"-u", pdAddr, "store", "check", "Invalid_State"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - re.Contains(string(output), "Unknown state: Invalid_state") + re.Contains(string(output), "unknown StoreState: Invalid_state") + + // Mock a disconnected store. + storeCheck := &metapb.Store{ + Id: uint64(2000), + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano() - int64(1*time.Minute), + } + tests.MustPutStore(re, cluster, storeCheck) + args = []string{"-u", pdAddr, "store", "check", "Disconnected"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "\"id\": 2000,") + // Mock a down store. + storeCheck.Id = uint64(2001) + storeCheck.LastHeartbeat = time.Now().UnixNano() - int64(1*time.Hour) + tests.MustPutStore(re, cluster, storeCheck) + args = []string{"-u", pdAddr, "store", "check", "Down"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "\"id\": 2001,") // store cancel-delete command limit = leaderServer.GetRaftCluster().GetStoreLimitByType(1, storelimit.RemovePeer) diff --git a/tools/pd-ctl/pdctl/command/store_command.go b/tools/pd-ctl/pdctl/command/store_command.go index 1dee1c13a72..357e527c2f4 100644 --- a/tools/pd-ctl/pdctl/command/store_command.go +++ b/tools/pd-ctl/pdctl/command/store_command.go @@ -143,7 +143,7 @@ func NewStoreLimitCommand() *cobra.Command { // NewStoreCheckCommand return a check subcommand of storeCmd func NewStoreCheckCommand() *cobra.Command { d := &cobra.Command{ - Use: "check [up|offline|tombstone]", + Use: "check [up|offline|tombstone|disconnected|down]", Short: "Check all the stores with specified status", Run: storeCheckCommandFunc, } @@ -666,13 +666,7 @@ func storeCheckCommandFunc(cmd *cobra.Command, args []string) { caser := cases.Title(language.Und) state := caser.String(strings.ToLower(args[0])) - stateValue, ok := metapb.StoreState_value[state] - if !ok { - cmd.Println("Unknown state: " + state) - return - } - - prefix := fmt.Sprintf("%s?state=%d", storesPrefix, stateValue) + prefix := fmt.Sprintf("%s/check?state=%s", storesPrefix, state) r, err := doRequest(cmd, prefix, http.MethodGet, http.Header{}) if err != nil { cmd.Printf("Failed to get store: %s\n", err)