Skip to content

Commit

Permalink
Merge branch 'master' into sche-redirect5
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 committed Oct 12, 2023
2 parents 29645d3 + 779b5be commit a768778
Show file tree
Hide file tree
Showing 11 changed files with 580 additions and 32 deletions.
3 changes: 1 addition & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,7 @@ func (c *Cluster) collectClusterMetrics() {
}

func (c *Cluster) resetMetrics() {
statsMap := statistics.NewStoreStatisticsMap(c.persistConfig)
statsMap.Reset()
statistics.Reset()

c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
c.coordinator.ResetHotSpotMetrics()
Expand Down
11 changes: 8 additions & 3 deletions pkg/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) {
case metapb.NodeState_Removed:
s.Tombstone++
s.Removed++
s.resetStoreStatistics(storeAddress, id)
return
}

Expand Down Expand Up @@ -261,7 +260,8 @@ func (s *storeStatistics) Collect() {
}
}

func (s *storeStatistics) resetStoreStatistics(storeAddress string, id string) {
// ResetStoreStatistics resets the metrics of store.
func ResetStoreStatistics(storeAddress string, id string) {
metrics := []string{
"region_score",
"leader_score",
Expand All @@ -282,6 +282,10 @@ func (s *storeStatistics) resetStoreStatistics(storeAddress string, id string) {
"store_read_query_rate",
"store_regions_write_rate_bytes",
"store_regions_write_rate_keys",
"store_slow_trend_cause_value",
"store_slow_trend_cause_rate",
"store_slow_trend_result_value",
"store_slow_trend_result_rate",
}
for _, m := range metrics {
storeStatusGauge.DeleteLabelValues(storeAddress, id, m)
Expand Down Expand Up @@ -313,7 +317,8 @@ func (m *storeStatisticsMap) Collect() {
m.stats.Collect()
}

func (m *storeStatisticsMap) Reset() {
// Reset resets the metrics.
func Reset() {
storeStatusGauge.Reset()
clusterStatusGauge.Reset()
placementStatusGauge.Reset()
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "/stores/limit/scene", storesHandler.SetStoreLimitScene, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/stores/limit/scene", storesHandler.GetStoreLimitScene, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/stores/progress", storesHandler.GetStoresProgress, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/stores/check", storesHandler.GetStoresByState, setMethods(http.MethodGet), setAuditBackend(prometheus))

labelsHandler := newLabelsHandler(svr, rd)
registerFunc(clusterRouter, "/labels", labelsHandler.GetLabels, setMethods(http.MethodGet), setAuditBackend(prometheus))
Expand Down
55 changes: 54 additions & 1 deletion server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/server"
Expand Down Expand Up @@ -740,6 +741,7 @@ func (h *storesHandler) GetStoresProgress(w http.ResponseWriter, r *http.Request
// @Success 200 {object} StoresInfo
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /stores [get]
// @Deprecated Better to use /stores/check instead.
func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
stores := rc.GetMetaStores()
Expand All @@ -753,7 +755,7 @@ func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) {
return
}

stores = urlFilter.filter(rc.GetMetaStores())
stores = urlFilter.filter(stores)
for _, s := range stores {
storeID := s.GetId()
store := rc.GetStore(storeID)
Expand All @@ -770,6 +772,57 @@ func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, StoresInfo)
}

// @Tags store
// @Summary Get all stores by states in the cluster.
// @Param state query array true "Specify accepted store states."
// @Produce json
// @Success 200 {object} StoresInfo
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /stores/check [get]
func (h *storesHandler) GetStoresByState(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
stores := rc.GetMetaStores()
StoresInfo := &StoresInfo{
Stores: make([]*StoreInfo, 0, len(stores)),
}

lowerStateName := []string{strings.ToLower(downStateName), strings.ToLower(disconnectedName)}
for _, v := range metapb.StoreState_name {
lowerStateName = append(lowerStateName, strings.ToLower(v))
}

var queryStates []string
if v, ok := r.URL.Query()["state"]; ok {
for _, s := range v {
stateName := strings.ToLower(s)
if stateName != "" && !slice.Contains(lowerStateName, stateName) {
h.rd.JSON(w, http.StatusBadRequest, "unknown StoreState: "+s)
return
} else if stateName != "" {
queryStates = append(queryStates, stateName)
}
}
}

for _, s := range stores {
storeID := s.GetId()
store := rc.GetStore(storeID)
if store == nil {
h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error())
return
}

storeInfo := newStoreInfo(h.GetScheduleConfig(), store)
if queryStates != nil && !slice.Contains(queryStates, strings.ToLower(storeInfo.Store.StateName)) {
continue
}
StoresInfo.Stores = append(StoresInfo.Stores, storeInfo)
}
StoresInfo.Count = len(StoresInfo.Stores)

h.rd.JSON(w, http.StatusOK, StoresInfo)
}

type storeStateFilter struct {
accepts []metapb.StoreState
}
Expand Down
52 changes: 50 additions & 2 deletions server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/core"
tu "github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/pkg/versioninfo"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
)
Expand Down Expand Up @@ -140,17 +141,64 @@ func (suite *storeTestSuite) TestStoresList() {
suite.NoError(err)
checkStoresInfo(re, info.Stores, suite.stores[:3])

url = fmt.Sprintf("%s/stores?state=0", suite.urlPrefix)
url = fmt.Sprintf("%s/stores/check?state=up", suite.urlPrefix)
info = new(StoresInfo)
err = tu.ReadGetJSON(re, testDialClient, url, info)
suite.NoError(err)
checkStoresInfo(re, info.Stores, suite.stores[:2])

url = fmt.Sprintf("%s/stores?state=1", suite.urlPrefix)
url = fmt.Sprintf("%s/stores/check?state=offline", suite.urlPrefix)
info = new(StoresInfo)
err = tu.ReadGetJSON(re, testDialClient, url, info)
suite.NoError(err)
checkStoresInfo(re, info.Stores, suite.stores[2:3])

url = fmt.Sprintf("%s/stores/check?state=tombstone", suite.urlPrefix)
info = new(StoresInfo)
err = tu.ReadGetJSON(re, testDialClient, url, info)
suite.NoError(err)
checkStoresInfo(re, info.Stores, suite.stores[3:])

url = fmt.Sprintf("%s/stores/check?state=tombstone&state=offline", suite.urlPrefix)
info = new(StoresInfo)
err = tu.ReadGetJSON(re, testDialClient, url, info)
suite.NoError(err)
checkStoresInfo(re, info.Stores, suite.stores[2:])

// down store
s := &server.GrpcServer{Server: suite.svr}
store := &metapb.Store{
Id: 100,
Address: fmt.Sprintf("tikv%d", 100),
State: metapb.StoreState_Up,
Version: versioninfo.MinSupportedVersion(versioninfo.Version2_0).String(),
LastHeartbeat: time.Now().UnixNano() - int64(1*time.Hour),
}
_, err = s.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()},
Store: store,
})
re.NoError(err)

url = fmt.Sprintf("%s/stores/check?state=down", suite.urlPrefix)
info = new(StoresInfo)
err = tu.ReadGetJSON(re, testDialClient, url, info)
suite.NoError(err)
checkStoresInfo(re, info.Stores, []*metapb.Store{store})

// disconnect store
store.LastHeartbeat = time.Now().UnixNano() - int64(1*time.Minute)
_, err = s.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()},
Store: store,
})
re.NoError(err)

url = fmt.Sprintf("%s/stores/check?state=disconnected", suite.urlPrefix)
info = new(StoresInfo)
err = tu.ReadGetJSON(re, testDialClient, url, info)
suite.NoError(err)
checkStoresInfo(re, info.Stores, []*metapb.Store{store})
}

func (suite *storeTestSuite) TestStoreGet() {
Expand Down
6 changes: 3 additions & 3 deletions server/apiv2/handlers/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func LoadKeyspace(c *gin.Context) {
// @Router /keyspaces/id/{id} [get]
func LoadKeyspaceByID(c *gin.Context) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil || id == 0 {
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, "invalid keyspace id")
return
}
Expand All @@ -158,7 +158,7 @@ func LoadKeyspaceByID(c *gin.Context) {

// parseLoadAllQuery parses LoadAllKeyspaces'/GetKeyspaceGroups' query parameters.
// page_token:
// The keyspace/keyspace group id of the scan start. If not set, scan from keyspace/keyspace group with id 1.
// The keyspace/keyspace group id of the scan start. If not set, scan from keyspace/keyspace group with id 0.
// It's string of ID of the previous scan result's last element (next_page_token).
// limit:
// The maximum number of keyspace metas/keyspace groups to return. If not set, no limit is posed.
Expand All @@ -167,7 +167,7 @@ func LoadKeyspaceByID(c *gin.Context) {
func parseLoadAllQuery(c *gin.Context) (scanStart uint32, scanLimit int, err error) {
pageToken, set := c.GetQuery("page_token")
if !set || pageToken == "" {
// If pageToken is empty or unset, then scan from ID of 1.
// If pageToken is empty or unset, then scan from ID of 0.
scanStart = 0
} else {
scanStart64, err := strconv.ParseUint(pageToken, 10, 32)
Expand Down
8 changes: 5 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,10 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error {
// clean up the residual information.
delete(c.prevStoreLimit, storeID)
c.RemoveStoreLimit(storeID)
c.resetProgress(storeID, store.GetAddress())
addr := store.GetAddress()
c.resetProgress(storeID, addr)
storeIDStr := strconv.FormatUint(storeID, 10)
statistics.ResetStoreStatistics(addr, storeIDStr)
if !c.isAPIServiceMode {
c.hotStat.RemoveRollingStoreStats(storeID)
c.slowStat.RemoveSlowStoreStatus(storeID)
Expand Down Expand Up @@ -2169,8 +2172,7 @@ func (c *RaftCluster) collectMetrics() {
}

func (c *RaftCluster) resetMetrics() {
statsMap := statistics.NewStoreStatisticsMap(c.opt)
statsMap.Reset()
statistics.Reset()

if !c.isAPIServiceMode {
c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
Expand Down
Loading

0 comments on commit a768778

Please sign in to comment.