From c7b68a2e0e375d2aecd24ee0ad5bfe8e1481628c Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 10 Oct 2023 17:40:55 +0800 Subject: [PATCH 1/4] *: delete slow stats after store tombstone (#7181) close tikv/pd#7180 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/statistics/store_collection.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/statistics/store_collection.go b/pkg/statistics/store_collection.go index 3f01a1d5171..3e53e423084 100644 --- a/pkg/statistics/store_collection.go +++ b/pkg/statistics/store_collection.go @@ -282,6 +282,10 @@ func (s *storeStatistics) resetStoreStatistics(storeAddress string, id string) { "store_read_query_rate", "store_regions_write_rate_bytes", "store_regions_write_rate_keys", + "store_slow_trend_cause_value", + "store_slow_trend_cause_rate", + "store_slow_trend_result_value", + "store_slow_trend_result_rate", } for _, m := range metrics { storeStatusGauge.DeleteLabelValues(storeAddress, id, m) From 92858b774498a40d7ec7991879e3331cd7f1fc27 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 11 Oct 2023 11:42:25 +0800 Subject: [PATCH 2/4] *: remove store statistics after store tombstone (#7186) close tikv/pd#7187 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/scheduling/server/cluster.go | 3 +-- pkg/statistics/store_collection.go | 7 ++++--- server/cluster/cluster.go | 8 +++++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index a6a3de11d53..e8489fdaa15 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -470,8 +470,7 @@ func (c *Cluster) collectClusterMetrics() { } func (c *Cluster) resetMetrics() { - statsMap := statistics.NewStoreStatisticsMap(c.persistConfig) - statsMap.Reset() + statistics.Reset() c.coordinator.GetSchedulersController().ResetSchedulerMetrics() c.coordinator.ResetHotSpotMetrics() diff --git a/pkg/statistics/store_collection.go b/pkg/statistics/store_collection.go index 3e53e423084..dcdd77d9112 100644 --- a/pkg/statistics/store_collection.go +++ b/pkg/statistics/store_collection.go @@ -102,7 +102,6 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) { case metapb.NodeState_Removed: s.Tombstone++ s.Removed++ - s.resetStoreStatistics(storeAddress, id) return } @@ -261,7 +260,8 @@ func (s *storeStatistics) Collect() { } } -func (s *storeStatistics) resetStoreStatistics(storeAddress string, id string) { +// ResetStoreStatistics resets the metrics of store. +func ResetStoreStatistics(storeAddress string, id string) { metrics := []string{ "region_score", "leader_score", @@ -317,7 +317,8 @@ func (m *storeStatisticsMap) Collect() { m.stats.Collect() } -func (m *storeStatisticsMap) Reset() { +// Reset resets the metrics. +func Reset() { storeStatusGauge.Reset() clusterStatusGauge.Reset() placementStatusGauge.Reset() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 55f67f223cd..332a8b27a3f 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1625,7 +1625,10 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error { // clean up the residual information. delete(c.prevStoreLimit, storeID) c.RemoveStoreLimit(storeID) - c.resetProgress(storeID, store.GetAddress()) + addr := store.GetAddress() + c.resetProgress(storeID, addr) + storeIDStr := strconv.FormatUint(storeID, 10) + statistics.ResetStoreStatistics(addr, storeIDStr) if !c.isAPIServiceMode { c.hotStat.RemoveRollingStoreStats(storeID) c.slowStat.RemoveSlowStoreStatus(storeID) @@ -2169,8 +2172,7 @@ func (c *RaftCluster) collectMetrics() { } func (c *RaftCluster) resetMetrics() { - statsMap := statistics.NewStoreStatisticsMap(c.opt) - statsMap.Reset() + statistics.Reset() if !c.isAPIServiceMode { c.coordinator.GetSchedulersController().ResetSchedulerMetrics() From 873212fc872270adcba3cb9bdd1f3a0131845415 Mon Sep 17 00:00:00 2001 From: David <8039876+AmoebaProtozoa@users.noreply.github.com> Date: Tue, 10 Oct 2023 23:12:25 -0500 Subject: [PATCH 3/4] pd-ctl: add keyspace commands (#7158) ref tikv/pd#4399 Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/apiv2/handlers/keyspace.go | 6 +- tests/pdctl/keyspace/keyspace_test.go | 186 ++++++++++++- .../pd-ctl/pdctl/command/keyspace_command.go | 257 +++++++++++++++++- 3 files changed, 437 insertions(+), 12 deletions(-) diff --git a/server/apiv2/handlers/keyspace.go b/server/apiv2/handlers/keyspace.go index b93dc84faf8..9602cc863ef 100644 --- a/server/apiv2/handlers/keyspace.go +++ b/server/apiv2/handlers/keyspace.go @@ -138,7 +138,7 @@ func LoadKeyspace(c *gin.Context) { // @Router /keyspaces/id/{id} [get] func LoadKeyspaceByID(c *gin.Context) { id, err := strconv.ParseUint(c.Param("id"), 10, 64) - if err != nil || id == 0 { + if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, "invalid keyspace id") return } @@ -158,7 +158,7 @@ func LoadKeyspaceByID(c *gin.Context) { // parseLoadAllQuery parses LoadAllKeyspaces'/GetKeyspaceGroups' query parameters. // page_token: -// The keyspace/keyspace group id of the scan start. If not set, scan from keyspace/keyspace group with id 1. +// The keyspace/keyspace group id of the scan start. If not set, scan from keyspace/keyspace group with id 0. // It's string of ID of the previous scan result's last element (next_page_token). // limit: // The maximum number of keyspace metas/keyspace groups to return. If not set, no limit is posed. @@ -167,7 +167,7 @@ func LoadKeyspaceByID(c *gin.Context) { func parseLoadAllQuery(c *gin.Context) (scanStart uint32, scanLimit int, err error) { pageToken, set := c.GetQuery("page_token") if !set || pageToken == "" { - // If pageToken is empty or unset, then scan from ID of 1. + // If pageToken is empty or unset, then scan from ID of 0. scanStart = 0 } else { scanStart64, err := strconv.ParseUint(pageToken, 10, 32) diff --git a/tests/pdctl/keyspace/keyspace_test.go b/tests/pdctl/keyspace/keyspace_test.go index 57acdc86c70..805a30e6f18 100644 --- a/tests/pdctl/keyspace/keyspace_test.go +++ b/tests/pdctl/keyspace/keyspace_test.go @@ -18,11 +18,14 @@ import ( "context" "encoding/json" "fmt" + "strconv" "strings" "testing" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/testutil" @@ -65,7 +68,7 @@ func TestKeyspace(t *testing.T) { var k api.KeyspaceMeta keyspaceName := "keyspace_1" testutil.Eventually(re, func() bool { - args := []string{"-u", pdAddr, "keyspace", keyspaceName} + args := []string{"-u", pdAddr, "keyspace", "show", "name", keyspaceName} output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) re.NoError(json.Unmarshal(output, &k)) @@ -85,7 +88,7 @@ func TestKeyspace(t *testing.T) { // check keyspace group in keyspace whether changed. testutil.Eventually(re, func() bool { - args := []string{"-u", pdAddr, "keyspace", keyspaceName} + args := []string{"-u", pdAddr, "keyspace", "show", "name", keyspaceName} output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) re.NoError(json.Unmarshal(output, &k)) @@ -93,7 +96,7 @@ func TestKeyspace(t *testing.T) { }) // test error name - args := []string{"-u", pdAddr, "keyspace", "error_name"} + args := []string{"-u", pdAddr, "keyspace", "show", "name", "error_name"} output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) re.Contains(string(output), "Fail") @@ -101,3 +104,180 @@ func TestKeyspace(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller")) re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) } + +type keyspaceTestSuite struct { + suite.Suite + ctx context.Context + cancel context.CancelFunc + cluster *tests.TestCluster + pdAddr string +} + +func TestKeyspaceTestSuite(t *testing.T) { + suite.Run(t, new(keyspaceTestSuite)) +} + +func (suite *keyspaceTestSuite) SetupTest() { + suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) + tc, err := tests.NewTestAPICluster(suite.ctx, 1) + suite.NoError(err) + suite.NoError(tc.RunInitialServers()) + tc.WaitLeader() + leaderServer := tc.GetLeaderServer() + suite.NoError(leaderServer.BootstrapCluster()) + suite.cluster = tc + suite.pdAddr = tc.GetConfig().GetClientURL() +} + +func (suite *keyspaceTestSuite) TearDownTest() { + suite.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion")) + suite.cancel() +} + +func (suite *keyspaceTestSuite) TestShowKeyspace() { + re := suite.Require() + keyspaceName := "DEFAULT" + keyspaceID := uint32(0) + var k1, k2 api.KeyspaceMeta + // Show by name. + args := []string{"-u", suite.pdAddr, "keyspace", "show", "name", keyspaceName} + output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &k1)) + re.Equal(keyspaceName, k1.GetName()) + re.Equal(keyspaceID, k1.GetId()) + // Show by ID. + args = []string{"-u", suite.pdAddr, "keyspace", "show", "id", strconv.Itoa(int(keyspaceID))} + output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &k2)) + re.Equal(k1, k2) +} + +func mustCreateKeyspace(suite *keyspaceTestSuite, param api.CreateKeyspaceParams) api.KeyspaceMeta { + re := suite.Require() + var meta api.KeyspaceMeta + args := []string{"-u", suite.pdAddr, "keyspace", "create", param.Name} + for k, v := range param.Config { + args = append(args, "--config", fmt.Sprintf("%s=%s", k, v)) + } + output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &meta)) + return meta +} + +func (suite *keyspaceTestSuite) TestCreateKeyspace() { + re := suite.Require() + param := api.CreateKeyspaceParams{ + Name: "test_keyspace", + Config: map[string]string{ + "foo": "bar", + "foo2": "bar2", + }, + } + meta := mustCreateKeyspace(suite, param) + re.Equal(param.Name, meta.GetName()) + for k, v := range param.Config { + re.Equal(v, meta.Config[k]) + } +} + +func (suite *keyspaceTestSuite) TestUpdateKeyspaceConfig() { + re := suite.Require() + param := api.CreateKeyspaceParams{ + Name: "test_keyspace", + Config: map[string]string{"foo": "1"}, + } + meta := mustCreateKeyspace(suite, param) + re.Equal("1", meta.Config["foo"]) + + // Update one existing config and add a new config, resulting in config: {foo: 2, foo2: 1}. + args := []string{"-u", suite.pdAddr, "keyspace", "update-config", param.Name, "--update", "foo=2,foo2=1"} + output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &meta)) + re.Equal("test_keyspace", meta.GetName()) + re.Equal("2", meta.Config["foo"]) + re.Equal("1", meta.Config["foo2"]) + // Update one existing config and remove a config, resulting in config: {foo: 3}. + args = []string{"-u", suite.pdAddr, "keyspace", "update-config", param.Name, "--update", "foo=3", "--remove", "foo2"} + output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &meta)) + re.Equal("test_keyspace", meta.GetName()) + re.Equal("3", meta.Config["foo"]) + re.NotContains(meta.GetConfig(), "foo2") + // Error if a key is specified in both --update and --remove list. + args = []string{"-u", suite.pdAddr, "keyspace", "update-config", param.Name, "--update", "foo=4", "--remove", "foo"} + output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + re.Contains(string(output), "Fail") + // Error if a key is specified multiple times. + args = []string{"-u", suite.pdAddr, "keyspace", "update-config", param.Name, "--update", "foo=4,foo=5"} + output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + re.Contains(string(output), "Fail") +} + +func (suite *keyspaceTestSuite) TestUpdateKeyspaceState() { + re := suite.Require() + param := api.CreateKeyspaceParams{ + Name: "test_keyspace", + } + meta := mustCreateKeyspace(suite, param) + re.Equal(keyspacepb.KeyspaceState_ENABLED, meta.State) + // Disable the keyspace, capitalization shouldn't matter. + args := []string{"-u", suite.pdAddr, "keyspace", "update-state", param.Name, "DiSAbleD"} + output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &meta)) + re.Equal(keyspacepb.KeyspaceState_DISABLED, meta.State) + // Tombstone the keyspace without archiving should fail. + args = []string{"-u", suite.pdAddr, "keyspace", "update-state", param.Name, "TOMBSTONE"} + output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + re.Contains(string(output), "Fail") +} + +func (suite *keyspaceTestSuite) TestListKeyspace() { + re := suite.Require() + var param api.CreateKeyspaceParams + for i := 0; i < 10; i++ { + param = api.CreateKeyspaceParams{ + Name: fmt.Sprintf("test_keyspace_%d", i), + Config: map[string]string{ + "foo": fmt.Sprintf("bar_%d", i), + }, + } + mustCreateKeyspace(suite, param) + } + // List all keyspaces, there should be 11 of them (default + 10 created above). + args := []string{"-u", suite.pdAddr, "keyspace", "list"} + output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + var resp api.LoadAllKeyspacesResponse + re.NoError(json.Unmarshal(output, &resp)) + re.Len(resp.Keyspaces, 11) + re.Equal("", resp.NextPageToken) // No next page token since we load them all. + re.Equal("DEFAULT", resp.Keyspaces[0].GetName()) + for i, meta := range resp.Keyspaces[1:] { + re.Equal(fmt.Sprintf("test_keyspace_%d", i), meta.GetName()) + re.Equal(fmt.Sprintf("bar_%d", i), meta.Config["foo"]) + } + // List 3 keyspaces staring with keyspace id 3, should results in keyspace id 3, 4, 5 and next page token 6. + args = []string{"-u", suite.pdAddr, "keyspace", "list", "--limit", "3", "--page_token", "3"} + output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &resp)) + re.Len(resp.Keyspaces, 3) + for i, meta := range resp.Keyspaces { + re.Equal(uint32(i+3), meta.GetId()) + re.Equal(fmt.Sprintf("test_keyspace_%d", i+2), meta.GetName()) + re.Equal(fmt.Sprintf("bar_%d", i+2), meta.Config["foo"]) + } + re.Equal("6", resp.NextPageToken) +} diff --git a/tools/pd-ctl/pdctl/command/keyspace_command.go b/tools/pd-ctl/pdctl/command/keyspace_command.go index a68e2f05a80..7c0d3d78bf6 100644 --- a/tools/pd-ctl/pdctl/command/keyspace_command.go +++ b/tools/pd-ctl/pdctl/command/keyspace_command.go @@ -15,33 +15,278 @@ package command import ( + "bytes" + "encoding/json" "fmt" "net/http" + "strings" "github.com/spf13/cobra" + "github.com/tikv/pd/server/apiv2/handlers" ) -const keyspacePrefix = "pd/api/v2/keyspaces" +const ( + keyspacePrefix = "pd/api/v2/keyspaces" + // flags + nmConfig = "config" + nmLimit = "limit" + nmPageToken = "page_token" + nmRemove = "remove" + nmUpdate = "update" +) // NewKeyspaceCommand returns a keyspace subcommand of rootCmd. func NewKeyspaceCommand() *cobra.Command { cmd := &cobra.Command{ - Use: "keyspace [command] [flags]", - Short: "show keyspace information", - Run: showKeyspaceCommandFunc, + Use: "keyspace [flags]", + Short: "keyspace commands", } + cmd.AddCommand(newShowKeyspaceCommand()) + cmd.AddCommand(newCreateKeyspaceCommand()) + cmd.AddCommand(newUpdateKeyspaceConfigCommand()) + cmd.AddCommand(newUpdateKeyspaceStateCommand()) + cmd.AddCommand(newListKeyspaceCommand()) return cmd } -func showKeyspaceCommandFunc(cmd *cobra.Command, args []string) { +func newShowKeyspaceCommand() *cobra.Command { + r := &cobra.Command{ + Use: "show", + Short: "show keyspace metadata", + } + showByID := &cobra.Command{ + Use: "id ", + Short: "show keyspace metadata specified by keyspace id", + Run: showKeyspaceIDCommandFunc, + } + showByName := &cobra.Command{ + Use: "name ", + Short: "show keyspace metadata specified by keyspace name", + Run: showKeyspaceNameCommandFunc, + } + r.AddCommand(showByID) + r.AddCommand(showByName) + return r +} + +func showKeyspaceIDCommandFunc(cmd *cobra.Command, args []string) { if len(args) != 1 { cmd.Usage() return } + resp, err := doRequest(cmd, fmt.Sprintf("%s/id/%s", keyspacePrefix, args[0]), http.MethodGet, http.Header{}) + if err != nil { + cmd.PrintErrln("Failed to get the keyspace information: ", err) + return + } + cmd.Println(resp) +} +func showKeyspaceNameCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + cmd.Usage() + return + } resp, err := doRequest(cmd, fmt.Sprintf("%s/%s?force_refresh_group_id=true", keyspacePrefix, args[0]), http.MethodGet, http.Header{}) if err != nil { - cmd.Printf("Failed to get the keyspace information: %s\n", err) + cmd.PrintErrln("Failed to get the keyspace information: ", err) + return + } + cmd.Println(resp) +} + +func newCreateKeyspaceCommand() *cobra.Command { + r := &cobra.Command{ + Use: "create [flags]", + Short: "create a keyspace", + Run: createKeyspaceCommandFunc, + } + r.Flags().StringSlice(nmConfig, nil, "keyspace configs for the new keyspace\n"+ + "specify as comma separated key value pairs, e.g. --config k1=v1,k2=v2") + return r +} + +func createKeyspaceCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + cmd.Usage() + return + } + + configPairs, err := cmd.Flags().GetStringSlice(nmConfig) + if err != nil { + cmd.PrintErrln("Failed to parse flag: ", err) + return + } + config := map[string]string{} + for _, flag := range configPairs { + kvs := strings.Split(flag, ",") + for _, kv := range kvs { + pair := strings.Split(kv, "=") + if len(pair) != 2 { + cmd.PrintErrf("Failed to create keyspace: invalid kv pair %s\n", kv) + return + } + if _, exist := config[pair[0]]; exist { + cmd.PrintErrf("Failed to create keyspace: key %s is specified multiple times\n", pair[0]) + return + } + config[pair[0]] = pair[1] + } + } + params := handlers.CreateKeyspaceParams{ + Name: args[0], + Config: config, + } + body, err := json.Marshal(params) + if err != nil { + cmd.PrintErrln("Failed to encode the request body: ", err) + return + } + resp, err := doRequest(cmd, keyspacePrefix, http.MethodPost, http.Header{}, WithBody(bytes.NewBuffer(body))) + if err != nil { + cmd.PrintErrln("Failed to create the keyspace: ", err) + return + } + cmd.Println(resp) +} + +func newUpdateKeyspaceConfigCommand() *cobra.Command { + r := &cobra.Command{ + Use: "update-config ", + Short: "update keyspace config", + Run: updateKeyspaceConfigCommandFunc, + } + r.Flags().StringSlice(nmRemove, nil, "keys to remove from keyspace config\n"+ + "specify as comma separated keys, e.g. --remove k1,k2") + r.Flags().StringSlice(nmUpdate, nil, "kv pairs to upsert into keyspace config\n"+ + "specify as comma separated key value pairs, e.g. --update k1=v1,k2=v2") + return r +} + +func updateKeyspaceConfigCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + cmd.Usage() + return + } + configPatch := map[string]*string{} + removeFlags, err := cmd.Flags().GetStringSlice(nmRemove) + if err != nil { + cmd.PrintErrln("Failed to parse flag: ", err) + return + } + for _, flag := range removeFlags { + keys := strings.Split(flag, ",") + for _, key := range keys { + if _, exist := configPatch[key]; exist { + cmd.PrintErrf("Failed to update keyspace config: key %s is specified multiple times\n", key) + return + } + configPatch[key] = nil + } + } + updateFlags, err := cmd.Flags().GetStringSlice(nmUpdate) + if err != nil { + cmd.PrintErrln("Failed to parse flag: ", err) + return + } + for _, flag := range updateFlags { + kvs := strings.Split(flag, ",") + for _, kv := range kvs { + pair := strings.Split(kv, "=") + if len(pair) != 2 { + cmd.PrintErrf("Failed to update keyspace config: invalid kv pair %s\n", kv) + return + } + if _, exist := configPatch[pair[0]]; exist { + cmd.PrintErrf("Failed to update keyspace config: key %s is specified multiple times\n", pair[0]) + return + } + configPatch[pair[0]] = &pair[1] + } + } + params := handlers.UpdateConfigParams{Config: configPatch} + data, err := json.Marshal(params) + if err != nil { + cmd.PrintErrln("Failed to update keyspace config:", err) + return + } + url := fmt.Sprintf("%s/%s/config", keyspacePrefix, args[0]) + resp, err := doRequest(cmd, url, http.MethodPatch, http.Header{}, WithBody(bytes.NewBuffer(data))) + if err != nil { + cmd.PrintErrln("Failed to update the keyspace config: ", err) + return + } + cmd.Println(resp) +} + +func newUpdateKeyspaceStateCommand() *cobra.Command { + r := &cobra.Command{ + Use: "update-state ", + Long: "update keyspace state, state can be one of: ENABLED, DISABLED, ARCHIVED, TOMBSTONE", + Run: updateKeyspaceStateCommandFunc, + } + return r +} + +func updateKeyspaceStateCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 2 { + cmd.Usage() + return + } + params := handlers.UpdateStateParam{ + State: args[1], + } + data, err := json.Marshal(params) + if err != nil { + cmd.PrintErrln(err) + return + } + url := fmt.Sprintf("%s/%s/state", keyspacePrefix, args[0]) + resp, err := doRequest(cmd, url, http.MethodPut, http.Header{}, WithBody(bytes.NewBuffer(data))) + if err != nil { + cmd.PrintErrln("Failed to update the keyspace state: ", err) + return + } + cmd.Println(resp) +} + +func newListKeyspaceCommand() *cobra.Command { + r := &cobra.Command{ + Use: "list [flags]", + Short: "list keyspaces according to filters", + Run: listKeyspaceCommandFunc, + } + r.Flags().String(nmLimit, "", "The maximum number of keyspace metas to return. If not set, no limit is posed.") + r.Flags().String(nmPageToken, "", "The keyspace id of the scan start. If not set, scan from keyspace/keyspace group with id 0") + return r +} + +func listKeyspaceCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 0 { + cmd.Usage() + return + } + + url := keyspacePrefix + limit, err := cmd.Flags().GetString(nmLimit) + if err != nil { + cmd.PrintErrln("Failed to parse flag: ", err) + return + } + if limit != "" { + url += fmt.Sprintf("?limit=%s", limit) + } + pageToken, err := cmd.Flags().GetString(nmPageToken) + if err != nil { + cmd.PrintErrln("Failed to parse flag: ", err) + return + } + if pageToken != "" { + url += fmt.Sprintf("&page_token=%s", pageToken) + } + resp, err := doRequest(cmd, url, http.MethodGet, http.Header{}) + if err != nil { + cmd.PrintErrln("Failed to list keyspace: ", err) return } cmd.Println(resp) From 779b5be8b8df3dae64aa1b3c7f9a7d3904d48007 Mon Sep 17 00:00:00 2001 From: Hu# Date: Wed, 11 Oct 2023 17:27:27 +0800 Subject: [PATCH 4/4] ctl: support new interface to acquire stores by state (#7189) close tikv/pd#7188 Signed-off-by: husharp Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/api/router.go | 1 + server/api/store.go | 55 ++++++++++++++++++++- server/api/store_test.go | 52 ++++++++++++++++++- tests/pdctl/store/store_test.go | 23 ++++++++- tools/pd-ctl/pdctl/command/store_command.go | 10 +--- 5 files changed, 129 insertions(+), 12 deletions(-) diff --git a/server/api/router.go b/server/api/router.go index f47f0e3ebf2..93811e264f1 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -226,6 +226,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/stores/limit/scene", storesHandler.SetStoreLimitScene, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/stores/limit/scene", storesHandler.GetStoreLimitScene, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/stores/progress", storesHandler.GetStoresProgress, setMethods(http.MethodGet), setAuditBackend(prometheus)) + registerFunc(clusterRouter, "/stores/check", storesHandler.GetStoresByState, setMethods(http.MethodGet), setAuditBackend(prometheus)) labelsHandler := newLabelsHandler(svr, rd) registerFunc(clusterRouter, "/labels", labelsHandler.GetLabels, setMethods(http.MethodGet), setAuditBackend(prometheus)) diff --git a/server/api/store.go b/server/api/store.go index 7c820a3befa..a44850d35cc 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -31,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server" @@ -740,6 +741,7 @@ func (h *storesHandler) GetStoresProgress(w http.ResponseWriter, r *http.Request // @Success 200 {object} StoresInfo // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /stores [get] +// @Deprecated Better to use /stores/check instead. func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) stores := rc.GetMetaStores() @@ -753,7 +755,7 @@ func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) { return } - stores = urlFilter.filter(rc.GetMetaStores()) + stores = urlFilter.filter(stores) for _, s := range stores { storeID := s.GetId() store := rc.GetStore(storeID) @@ -770,6 +772,57 @@ func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusOK, StoresInfo) } +// @Tags store +// @Summary Get all stores by states in the cluster. +// @Param state query array true "Specify accepted store states." +// @Produce json +// @Success 200 {object} StoresInfo +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /stores/check [get] +func (h *storesHandler) GetStoresByState(w http.ResponseWriter, r *http.Request) { + rc := getCluster(r) + stores := rc.GetMetaStores() + StoresInfo := &StoresInfo{ + Stores: make([]*StoreInfo, 0, len(stores)), + } + + lowerStateName := []string{strings.ToLower(downStateName), strings.ToLower(disconnectedName)} + for _, v := range metapb.StoreState_name { + lowerStateName = append(lowerStateName, strings.ToLower(v)) + } + + var queryStates []string + if v, ok := r.URL.Query()["state"]; ok { + for _, s := range v { + stateName := strings.ToLower(s) + if stateName != "" && !slice.Contains(lowerStateName, stateName) { + h.rd.JSON(w, http.StatusBadRequest, "unknown StoreState: "+s) + return + } else if stateName != "" { + queryStates = append(queryStates, stateName) + } + } + } + + for _, s := range stores { + storeID := s.GetId() + store := rc.GetStore(storeID) + if store == nil { + h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error()) + return + } + + storeInfo := newStoreInfo(h.GetScheduleConfig(), store) + if queryStates != nil && !slice.Contains(queryStates, strings.ToLower(storeInfo.Store.StateName)) { + continue + } + StoresInfo.Stores = append(StoresInfo.Stores, storeInfo) + } + StoresInfo.Count = len(StoresInfo.Stores) + + h.rd.JSON(w, http.StatusOK, StoresInfo) +} + type storeStateFilter struct { accepts []metapb.StoreState } diff --git a/server/api/store_test.go b/server/api/store_test.go index 4bcdf1953a5..2b3a8dee9bb 100644 --- a/server/api/store_test.go +++ b/server/api/store_test.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/core" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" + "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" ) @@ -140,17 +141,64 @@ func (suite *storeTestSuite) TestStoresList() { suite.NoError(err) checkStoresInfo(re, info.Stores, suite.stores[:3]) - url = fmt.Sprintf("%s/stores?state=0", suite.urlPrefix) + url = fmt.Sprintf("%s/stores/check?state=up", suite.urlPrefix) info = new(StoresInfo) err = tu.ReadGetJSON(re, testDialClient, url, info) suite.NoError(err) checkStoresInfo(re, info.Stores, suite.stores[:2]) - url = fmt.Sprintf("%s/stores?state=1", suite.urlPrefix) + url = fmt.Sprintf("%s/stores/check?state=offline", suite.urlPrefix) info = new(StoresInfo) err = tu.ReadGetJSON(re, testDialClient, url, info) suite.NoError(err) checkStoresInfo(re, info.Stores, suite.stores[2:3]) + + url = fmt.Sprintf("%s/stores/check?state=tombstone", suite.urlPrefix) + info = new(StoresInfo) + err = tu.ReadGetJSON(re, testDialClient, url, info) + suite.NoError(err) + checkStoresInfo(re, info.Stores, suite.stores[3:]) + + url = fmt.Sprintf("%s/stores/check?state=tombstone&state=offline", suite.urlPrefix) + info = new(StoresInfo) + err = tu.ReadGetJSON(re, testDialClient, url, info) + suite.NoError(err) + checkStoresInfo(re, info.Stores, suite.stores[2:]) + + // down store + s := &server.GrpcServer{Server: suite.svr} + store := &metapb.Store{ + Id: 100, + Address: fmt.Sprintf("tikv%d", 100), + State: metapb.StoreState_Up, + Version: versioninfo.MinSupportedVersion(versioninfo.Version2_0).String(), + LastHeartbeat: time.Now().UnixNano() - int64(1*time.Hour), + } + _, err = s.PutStore(context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()}, + Store: store, + }) + re.NoError(err) + + url = fmt.Sprintf("%s/stores/check?state=down", suite.urlPrefix) + info = new(StoresInfo) + err = tu.ReadGetJSON(re, testDialClient, url, info) + suite.NoError(err) + checkStoresInfo(re, info.Stores, []*metapb.Store{store}) + + // disconnect store + store.LastHeartbeat = time.Now().UnixNano() - int64(1*time.Minute) + _, err = s.PutStore(context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()}, + Store: store, + }) + re.NoError(err) + + url = fmt.Sprintf("%s/stores/check?state=disconnected", suite.urlPrefix) + info = new(StoresInfo) + err = tu.ReadGetJSON(re, testDialClient, url, info) + suite.NoError(err) + checkStoresInfo(re, info.Stores, []*metapb.Store{store}) } func (suite *storeTestSuite) TestStoreGet() { diff --git a/tests/pdctl/store/store_test.go b/tests/pdctl/store/store_test.go index 13c7350bb6f..3400841b5ea 100644 --- a/tests/pdctl/store/store_test.go +++ b/tests/pdctl/store/store_test.go @@ -327,7 +327,28 @@ func TestStore(t *testing.T) { args = []string{"-u", pdAddr, "store", "check", "Invalid_State"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - re.Contains(string(output), "Unknown state: Invalid_state") + re.Contains(string(output), "unknown StoreState: Invalid_state") + + // Mock a disconnected store. + storeCheck := &metapb.Store{ + Id: uint64(2000), + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano() - int64(1*time.Minute), + } + tests.MustPutStore(re, cluster, storeCheck) + args = []string{"-u", pdAddr, "store", "check", "Disconnected"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "\"id\": 2000,") + // Mock a down store. + storeCheck.Id = uint64(2001) + storeCheck.LastHeartbeat = time.Now().UnixNano() - int64(1*time.Hour) + tests.MustPutStore(re, cluster, storeCheck) + args = []string{"-u", pdAddr, "store", "check", "Down"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "\"id\": 2001,") // store cancel-delete command limit = leaderServer.GetRaftCluster().GetStoreLimitByType(1, storelimit.RemovePeer) diff --git a/tools/pd-ctl/pdctl/command/store_command.go b/tools/pd-ctl/pdctl/command/store_command.go index 1dee1c13a72..357e527c2f4 100644 --- a/tools/pd-ctl/pdctl/command/store_command.go +++ b/tools/pd-ctl/pdctl/command/store_command.go @@ -143,7 +143,7 @@ func NewStoreLimitCommand() *cobra.Command { // NewStoreCheckCommand return a check subcommand of storeCmd func NewStoreCheckCommand() *cobra.Command { d := &cobra.Command{ - Use: "check [up|offline|tombstone]", + Use: "check [up|offline|tombstone|disconnected|down]", Short: "Check all the stores with specified status", Run: storeCheckCommandFunc, } @@ -666,13 +666,7 @@ func storeCheckCommandFunc(cmd *cobra.Command, args []string) { caser := cases.Title(language.Und) state := caser.String(strings.ToLower(args[0])) - stateValue, ok := metapb.StoreState_value[state] - if !ok { - cmd.Println("Unknown state: " + state) - return - } - - prefix := fmt.Sprintf("%s?state=%d", storesPrefix, stateValue) + prefix := fmt.Sprintf("%s/check?state=%s", storesPrefix, state) r, err := doRequest(cmd, prefix, http.MethodGet, http.Header{}) if err != nil { cmd.Printf("Failed to get store: %s\n", err)