From ea28fe83ddbbffc57eaff23b1e7147ff92b8aee8 Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Tue, 12 Sep 2023 15:46:41 +0800 Subject: [PATCH 1/4] fix wrong legend name (#7080) close tikv/pd#7081 Signed-off-by: bufferflies <1045931706@qq.com> --- metrics/grafana/pd.json | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index eec379c89ac..3a09bc9e618 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -7094,14 +7094,14 @@ "expr": "-sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-leader-scheduler\"}[1m])) by (source)", "format": "time_series", "intervalFactor": 2, - "legendFormat": "store-{{store}}", + "legendFormat": "store-{{source}}", "refId": "A" }, { "expr": "sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-leader-scheduler\"}[1m])) by (target)", "format": "time_series", "intervalFactor": 2, - "legendFormat": "store-{{store}}", + "legendFormat": "store-{{target}}", "refId": "B" } ], @@ -7198,14 +7198,14 @@ "expr": "-sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-region-scheduler\"}[1m])) by (source)", "format": "time_series", "intervalFactor": 2, - "legendFormat": "store-{{store}}", + "legendFormat": "store-{{source}}", "refId": "A" }, { "expr": "sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-region-scheduler\"}[1m])) by (target)", "format": "time_series", "intervalFactor": 2, - "legendFormat": "store-{{store}}", + "legendFormat": "store-{{target}}", "refId": "B" } ], From 2f57a9f050ebb96863500ccdf386fde1ded9d8c1 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 12 Sep 2023 18:36:10 +0800 Subject: [PATCH 2/4] chore(dashboard): update version to v2023.09.11.1 (#7065) (#7079) close tikv/pd#7066 1. security: improve tidb-dashboard login security, encrypt the login password to avoid transport plain text (required for 6.5 version) 2. debug-api: support pagination for ddl history for debug-api 3. keyvisual: use scanRegions instead of fetch all regions (required for 6.5 version) 4. refine execution plan, now it can be showed as table style Signed-off-by: ti-chi-bot Signed-off-by: nolouch Co-authored-by: Sparkle <1284531+baurine@users.noreply.github.com> Co-authored-by: nolouch Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- tests/integrations/client/go.mod | 2 +- tests/integrations/client/go.sum | 4 ++-- tests/integrations/mcs/go.mod | 2 +- tests/integrations/mcs/go.sum | 4 ++-- tests/integrations/tso/go.mod | 2 +- tests/integrations/tso/go.sum | 4 ++-- 8 files changed, 12 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 503ac53f33f..6f6be36d883 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 - github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 + github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 github.com/prometheus/client_golang v1.11.1 github.com/prometheus/common v0.26.0 github.com/sasha-s/go-deadlock v0.2.0 diff --git a/go.sum b/go.sum index e826e53af37..8640a98a6b3 100644 --- a/go.sum +++ b/go.sum @@ -446,8 +446,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 h1:K9lZMYuDuAiR5kOjFESwJ8KfSb4ui5zX6vZGbUp58uk= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 h1:oyrCfNlAWmLlUfEr+7YTSBo29SP/J1N8hnxBt5yUABo= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index 72eac02d341..70006a085e9 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -119,7 +119,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 2e3d6329886..04eb8e8647f 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -410,8 +410,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 h1:K9lZMYuDuAiR5kOjFESwJ8KfSb4ui5zX6vZGbUp58uk= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 h1:oyrCfNlAWmLlUfEr+7YTSBo29SP/J1N8hnxBt5yUABo= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 62b4b022ead..98c8a420238 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -119,7 +119,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index a759c68761d..801d9b926cc 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -414,8 +414,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 h1:K9lZMYuDuAiR5kOjFESwJ8KfSb4ui5zX6vZGbUp58uk= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 h1:oyrCfNlAWmLlUfEr+7YTSBo29SP/J1N8hnxBt5yUABo= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index 84825b5a466..a9788e01ed2 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -117,7 +117,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 6f9c7f8f2cd..891bc42d24f 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -408,8 +408,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 h1:K9lZMYuDuAiR5kOjFESwJ8KfSb4ui5zX6vZGbUp58uk= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 h1:oyrCfNlAWmLlUfEr+7YTSBo29SP/J1N8hnxBt5yUABo= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= From b8d9c6e04992d233ee60bc0213cc9251c0b74110 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 13 Sep 2023 12:12:07 +0800 Subject: [PATCH 3/4] mcs: watch scheduling service's primary address (#7072) ref tikv/pd#5839 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/member/participant.go | 33 ++++++++--------- pkg/storage/endpoint/key_path.go | 6 ++++ server/api/server.go | 5 ++- server/server.go | 36 +++++++++++++------ tests/cluster.go | 5 +++ .../mcs/scheduling/server_test.go | 15 ++++++-- 6 files changed, 68 insertions(+), 32 deletions(-) diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 27dced57791..b3034a86807 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -166,15 +166,7 @@ func (m *Participant) setLeader(member participant) { // unsetLeader unsets the member's leader. func (m *Participant) unsetLeader() { - var leader participant - switch m.serviceName { - case utils.TSOServiceName: - leader = &tsopb.Participant{} - case utils.SchedulingServiceName: - leader = &schedulingpb.Participant{} - case utils.ResourceManagerServiceName: - leader = &resource_manager.Participant{} - } + leader := NewParticipantByService(m.serviceName) m.leader.Store(leader) m.lastLeaderUpdatedTime.Store(time.Now()) } @@ -225,15 +217,7 @@ func (m *Participant) PreCheckLeader() error { // getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). func (m *Participant) getPersistentLeader() (participant, int64, error) { - var leader participant - switch m.serviceName { - case utils.TSOServiceName: - leader = &tsopb.Participant{} - case utils.SchedulingServiceName: - leader = &schedulingpb.Participant{} - case utils.ResourceManagerServiceName: - leader = &resource_manager.Participant{} - } + leader := NewParticipantByService(m.serviceName) ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader) if err != nil { return nil, 0, err @@ -399,3 +383,16 @@ func (m *Participant) campaignCheck() bool { func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) { m.campaignChecker.Store(checker) } + +// NewParticipantByService creates a new participant by service name. +func NewParticipantByService(serviceName string) (p participant) { + switch serviceName { + case utils.TSOServiceName: + p = &tsopb.Participant{} + case utils.SchedulingServiceName: + p = &schedulingpb.Participant{} + case utils.ResourceManagerServiceName: + p = &resource_manager.Participant{} + } + return p +} diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 85af79203a4..4b67441a5ac 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -325,6 +325,12 @@ func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string { return path.Join(electionPath, utils.PrimaryKey) } +// SchedulingPrimaryPath returns the path of scheduling primary. +// Path: /ms/{cluster_id}/scheduling/primary +func SchedulingPrimaryPath(clusterID uint64) string { + return path.Join(SchedulingSvcRootPath(clusterID), utils.PrimaryKey) +} + // KeyspaceGroupsElectionPath returns the path of keyspace groups election. // default keyspace group: "/ms/{cluster_id}/tso/00000". // non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}". diff --git a/server/api/server.go b/server/api/server.go index 272e76cc60b..1d881022c04 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -20,6 +20,7 @@ import ( "github.com/gorilla/mux" tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" + mcs "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/serverapi" "github.com/tikv/pd/server" @@ -39,7 +40,9 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP router.PathPrefix(apiPrefix).Handler(negroni.New( serverapi.NewRuntimeServiceValidator(svr, group), serverapi.NewRedirector(svr, serverapi.MicroserviceRedirectRule( - apiPrefix+"/api/v1"+"/admin/reset-ts", tsoapi.APIPathPrefix+"/admin/reset-ts", "tso")), + apiPrefix+"/api/v1"+"/admin/reset-ts", + tsoapi.APIPathPrefix+"/admin/reset-ts", + mcs.TSOServiceName)), negroni.Wrap(r)), ) diff --git a/server/server.go b/server/server.go index b74ca5b57b3..2a076923caf 100644 --- a/server/server.go +++ b/server/server.go @@ -226,10 +226,11 @@ type Server struct { auditBackends []audit.Backend - registry *registry.ServiceRegistry - mode string - servicePrimaryMap sync.Map /* Store as map[string]string */ - tsoPrimaryWatcher *etcdutil.LoopWatcher + registry *registry.ServiceRegistry + mode string + servicePrimaryMap sync.Map /* Store as map[string]string */ + tsoPrimaryWatcher *etcdutil.LoopWatcher + schedulingPrimaryWatcher *etcdutil.LoopWatcher } // HandlerBuilder builds a server HTTP handler. @@ -617,7 +618,7 @@ func (s *Server) startServerLoop(ctx context.Context) { go s.encryptionKeyManagerLoop() if s.IsAPIServiceMode() { s.initTSOPrimaryWatcher() - s.tsoPrimaryWatcher.StartWatchLoop() + s.initSchedulingPrimaryWatcher() } } @@ -1962,8 +1963,20 @@ func (s *Server) initTSOPrimaryWatcher() { serviceName := mcs.TSOServiceName tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID) tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID) + s.tsoPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, tsoServicePrimaryKey) + s.tsoPrimaryWatcher.StartWatchLoop() +} + +func (s *Server) initSchedulingPrimaryWatcher() { + serviceName := mcs.SchedulingServiceName + primaryKey := endpoint.SchedulingPrimaryPath(s.clusterID) + s.schedulingPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, primaryKey) + s.schedulingPrimaryWatcher.StartWatchLoop() +} + +func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string) *etcdutil.LoopWatcher { putFn := func(kv *mvccpb.KeyValue) error { - primary := &tsopb.Participant{} // TODO: use Generics + primary := member.NewParticipantByService(serviceName) if err := proto.Unmarshal(kv.Value, primary); err != nil { return err } @@ -1971,7 +1984,7 @@ func (s *Server) initTSOPrimaryWatcher() { if len(listenUrls) > 0 { // listenUrls[0] is the primary service endpoint of the keyspace group s.servicePrimaryMap.Store(serviceName, listenUrls[0]) - log.Info("update tso primary", zap.String("primary", listenUrls[0])) + log.Info("update service primary", zap.String("service-name", serviceName), zap.String("primary", listenUrls[0])) } return nil } @@ -1981,16 +1994,17 @@ func (s *Server) initTSOPrimaryWatcher() { if ok { oldPrimary = v.(string) } - log.Info("delete tso primary", zap.String("old-primary", oldPrimary)) + log.Info("delete service primary", zap.String("service-name", serviceName), zap.String("old-primary", oldPrimary)) s.servicePrimaryMap.Delete(serviceName) return nil } - s.tsoPrimaryWatcher = etcdutil.NewLoopWatcher( + name := fmt.Sprintf("%s-primary-watcher", serviceName) + return etcdutil.NewLoopWatcher( s.serverLoopCtx, &s.serverLoopWg, s.client, - "tso-primary-watcher", - tsoServicePrimaryKey, + name, + primaryKey, putFn, deleteFn, func() error { return nil }, diff --git a/tests/cluster.go b/tests/cluster.go index 607955cc6a9..ce8293531cd 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -433,6 +433,11 @@ func (s *TestServer) GetTSOAllocatorManager() *tso.AllocatorManager { return s.server.GetTSOAllocatorManager() } +// GetServicePrimaryAddr returns the primary address of the service. +func (s *TestServer) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) { + return s.server.GetServicePrimaryAddr(ctx, serviceName) +} + // TestCluster is only for test. type TestCluster struct { config *clusterConfig diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 9b5371deb62..54994bbc34b 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/failpoint" "github.com/stretchr/testify/suite" + mcs "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" "go.uber.org/goleak" @@ -107,11 +108,21 @@ func (suite *serverTestSuite) TestPrimaryChange() { defer tc.Destroy() tc.WaitForPrimaryServing(re) primary := tc.GetPrimaryServer() - addr := primary.GetAddr() + oldPrimaryAddr := primary.GetAddr() re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5) + testutil.Eventually(re, func() bool { + watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName) + return ok && oldPrimaryAddr == watchedAddr + }) + // transfer leader primary.Close() tc.WaitForPrimaryServing(re) primary = tc.GetPrimaryServer() - re.NotEqual(addr, primary.GetAddr()) + newPrimaryAddr := primary.GetAddr() + re.NotEqual(oldPrimaryAddr, newPrimaryAddr) re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5) + testutil.Eventually(re, func() bool { + watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName) + return ok && newPrimaryAddr == watchedAddr + }) } From ae39fbf63ceed79c97842c057399dd3523292182 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 13 Sep 2023 18:56:36 +0800 Subject: [PATCH 4/4] move operator handler Signed-off-by: lhy1024 --- pkg/errs/error.go | 42 +++ pkg/schedule/handler/handler.go | 534 +++++++++++++++++++++++++++++++ server/api/hot_status.go | 5 +- server/api/label.go | 3 +- server/api/region.go | 7 +- server/api/rule.go | 2 +- server/api/store.go | 6 +- server/cluster/cluster.go | 4 +- server/grpc_service.go | 4 +- server/handler.go | 545 ++------------------------------ 10 files changed, 622 insertions(+), 530 deletions(-) create mode 100644 pkg/errs/error.go create mode 100644 pkg/schedule/handler/handler.go diff --git a/pkg/errs/error.go b/pkg/errs/error.go new file mode 100644 index 00000000000..5b3830a7024 --- /dev/null +++ b/pkg/errs/error.go @@ -0,0 +1,42 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package errs + +import "github.com/pingcap/errors" + +var ( + // ErrOperatorNotFound is error info for operator not found. + ErrOperatorNotFound = errors.New("operator not found") + // ErrAddOperator is error info for already have an operator when adding operator. + ErrAddOperator = errors.New("failed to add operator, maybe already have one") + // ErrRegionNotAdjacent is error info for region not adjacent. + ErrRegionNotAdjacent = errors.New("two regions are not adjacent") + // ErrRegionNotFound is error info for region not found. + ErrRegionNotFound = func(regionID uint64) error { + return errors.Errorf("region %v not found", regionID) + } + // ErrRegionAbnormalPeer is error info for region has abnormal peer. + ErrRegionAbnormalPeer = func(regionID uint64) error { + return errors.Errorf("region %v has abnormal peer", regionID) + } + // ErrStoreNotFoundByID is error info for store not found. + ErrStoreNotFoundByID = func(storeID uint64) error { + return errors.Errorf("store %v not found", storeID) + } + // ErrPluginNotFound is error info for plugin not found. + ErrPluginNotFound = func(pluginPath string) error { + return errors.Errorf("plugin is not found: %s", pluginPath) + } +) \ No newline at end of file diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go new file mode 100644 index 00000000000..6212e3bbd9d --- /dev/null +++ b/pkg/schedule/handler/handler.go @@ -0,0 +1,534 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handler + +import ( + "bytes" + "encoding/hex" + "strings" + "time" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/pkg/errors" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/schedule" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/filter" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/scatter" +) + +type Server interface { + GetCoordinator() (*schedule.Coordinator, error) + GetCluster() (sche.SharedCluster, error) +} + +type Handler struct { + Server +} + +// NewHandler creates a new handler. +func NewHandler(server Server) *Handler { + return &Handler{ + Server: server, + } +} + +// GetOperatorController returns OperatorController. +func (h *Handler) GetOperatorController() (*operator.Controller, error) { + co, err := h.GetCoordinator() + if err != nil { + return nil, err + } + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetOperatorController(), nil +} + +// GetRegionScatterer returns RegionScatterer. +func (h *Handler) GetRegionScatterer() (*scatter.RegionScatterer, error) { + co, err := h.GetCoordinator() + if err != nil { + return nil, err + } + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetRegionScatterer(), nil +} + +// GetOperator returns the region operator. +func (h *Handler) GetOperator(regionID uint64) (*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + + op := c.GetOperator(regionID) + if op == nil { + return nil, errs.ErrOperatorNotFound + } + + return op, nil +} + +// GetOperatorStatus returns the status of the region operator. +func (h *Handler) GetOperatorStatus(regionID uint64) (*operator.OpWithStatus, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + + op := c.GetOperatorStatus(regionID) + if op == nil { + return nil, errs.ErrOperatorNotFound + } + + return op, nil +} + +// RemoveOperator removes the region operator. +func (h *Handler) RemoveOperator(regionID uint64) error { + c, err := h.GetOperatorController() + if err != nil { + return err + } + + op := c.GetOperator(regionID) + if op == nil { + return errs.ErrOperatorNotFound + } + + _ = c.RemoveOperator(op, operator.AdminStop) + return nil +} + +// GetOperators returns the running operators. +func (h *Handler) GetOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperators(), nil +} + +// GetWaitingOperators returns the waiting operators. +func (h *Handler) GetWaitingOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetWaitingOperators(), nil +} + +// GetAdminOperators returns the running admin operators. +func (h *Handler) GetAdminOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperatorsOfKind(operator.OpAdmin), nil +} + +// GetLeaderOperators returns the running leader operators. +func (h *Handler) GetLeaderOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperatorsOfKind(operator.OpLeader), nil +} + +// GetRegionOperators returns the running region operators. +func (h *Handler) GetRegionOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperatorsOfKind(operator.OpRegion), nil +} + +// GetHistory returns finished operators' history since start. +func (h *Handler) GetHistory(start time.Time) ([]operator.OpHistory, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetHistory(start), nil +} + +// GetRecords returns finished operators since start. +func (h *Handler) GetRecords(from time.Time) ([]*operator.OpRecord, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + records := c.GetRecords(from) + if len(records) == 0 { + return nil, errs.ErrOperatorNotFound + } + return records, nil +} + +// AddTransferLeaderOperator adds an operator to transfer leader to the store. +func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) error { + c, err := h.GetCluster() + if err != nil { + return err + } + + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + newLeader := region.GetStoreVoter(storeID) + if newLeader == nil { + return errors.Errorf("region has no voter in store %v", storeID) + } + + op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, region.GetLeader().GetStoreId(), newLeader.GetStoreId(), []uint64{}, operator.OpAdmin) + if err != nil { + log.Debug("fail to create transfer leader operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddTransferRegionOperator adds an operator to transfer region to the stores. +func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64]placement.PeerRoleType) error { + c, err := h.GetCluster() + if err != nil { + return err + } + + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + if c.GetSharedConfig().IsPlacementRulesEnabled() { + // Cannot determine role without peer role when placement rules enabled. Not supported now. + for _, role := range storeIDs { + if len(role) == 0 { + return errors.New("transfer region without peer role is not supported when placement rules enabled") + } + } + } + for id := range storeIDs { + if err := checkStoreState(c, id); err != nil { + return err + } + } + + roles := make(map[uint64]placement.PeerRoleType) + for id, peerRole := range storeIDs { + if peerRole == "" { + peerRole = placement.Voter + } + roles[id] = peerRole + } + op, err := operator.CreateMoveRegionOperator("admin-move-region", c, region, operator.OpAdmin, roles) + if err != nil { + log.Debug("fail to create move region operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddTransferPeerOperator adds an operator to transfer peer. +func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreID uint64) error { + c, err := h.GetCluster() + if err != nil { + return err + } + + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + oldPeer := region.GetStorePeer(fromStoreID) + if oldPeer == nil { + return errors.Errorf("region has no peer in store %v", fromStoreID) + } + + if err := checkStoreState(c, toStoreID); err != nil { + return err + } + + newPeer := &metapb.Peer{StoreId: toStoreID, Role: oldPeer.GetRole(), IsWitness: oldPeer.GetIsWitness()} + op, err := operator.CreateMovePeerOperator("admin-move-peer", c, region, operator.OpAdmin, fromStoreID, newPeer) + if err != nil { + log.Debug("fail to create move peer operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// checkAdminAddPeerOperator checks adminAddPeer operator with given region ID and store ID. +func (h *Handler) checkAdminAddPeerOperator(regionID uint64, toStoreID uint64) (sche.SharedCluster, *core.RegionInfo, error) { + c, err := h.GetCluster() + if err != nil { + return nil, nil, err + } + + region := c.GetRegion(regionID) + if region == nil { + return nil, nil, errs.ErrRegionNotFound(regionID) + } + + if region.GetStorePeer(toStoreID) != nil { + return nil, nil, errors.Errorf("region already has peer in store %v", toStoreID) + } + + if err := checkStoreState(c, toStoreID); err != nil { + return nil, nil, err + } + + return c, region, nil +} + +// AddAddPeerOperator adds an operator to add peer. +func (h *Handler) AddAddPeerOperator(regionID uint64, toStoreID uint64) error { + c, region, err := h.checkAdminAddPeerOperator(regionID, toStoreID) + if err != nil { + return err + } + + newPeer := &metapb.Peer{StoreId: toStoreID} + op, err := operator.CreateAddPeerOperator("admin-add-peer", c, region, newPeer, operator.OpAdmin) + if err != nil { + log.Debug("fail to create add peer operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddAddLearnerOperator adds an operator to add learner. +func (h *Handler) AddAddLearnerOperator(regionID uint64, toStoreID uint64) error { + c, region, err := h.checkAdminAddPeerOperator(regionID, toStoreID) + if err != nil { + return err + } + + newPeer := &metapb.Peer{ + StoreId: toStoreID, + Role: metapb.PeerRole_Learner, + } + + op, err := operator.CreateAddPeerOperator("admin-add-learner", c, region, newPeer, operator.OpAdmin) + if err != nil { + log.Debug("fail to create add learner operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddRemovePeerOperator adds an operator to remove peer. +func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) error { + c, err := h.GetCluster() + if err != nil { + return err + } + + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + if region.GetStorePeer(fromStoreID) == nil { + return errors.Errorf("region has no peer in store %v", fromStoreID) + } + + op, err := operator.CreateRemovePeerOperator("admin-remove-peer", c, operator.OpAdmin, region, fromStoreID) + if err != nil { + log.Debug("fail to create move peer operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddMergeRegionOperator adds an operator to merge region. +func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error { + c, err := h.GetCluster() + if err != nil { + return err + } + + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + target := c.GetRegion(targetID) + if target == nil { + return errs.ErrRegionNotFound(targetID) + } + + if !filter.IsRegionHealthy(region) || !filter.IsRegionReplicated(c, region) { + return errs.ErrRegionAbnormalPeer(regionID) + } + + if !filter.IsRegionHealthy(target) || !filter.IsRegionReplicated(c, target) { + return errs.ErrRegionAbnormalPeer(targetID) + } + + // for the case first region (start key is nil) with the last region (end key is nil) but not adjacent + if (!bytes.Equal(region.GetStartKey(), target.GetEndKey()) || len(region.GetStartKey()) == 0) && + (!bytes.Equal(region.GetEndKey(), target.GetStartKey()) || len(region.GetEndKey()) == 0) { + return errs.ErrRegionNotAdjacent + } + + ops, err := operator.CreateMergeRegionOperator("admin-merge-region", c, region, target, operator.OpAdmin) + if err != nil { + log.Debug("fail to create merge region operator", errs.ZapError(err)) + return err + } + return h.addOperator(ops...) +} + +// AddSplitRegionOperator adds an operator to split a region. +func (h *Handler) AddSplitRegionOperator(regionID uint64, policyStr string, keys []string) error { + c, err := h.GetCluster() + if err != nil { + return err + } + + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + policy, ok := pdpb.CheckPolicy_value[strings.ToUpper(policyStr)] + if !ok { + return errors.Errorf("check policy %s is not supported", policyStr) + } + + var splitKeys [][]byte + if pdpb.CheckPolicy(policy) == pdpb.CheckPolicy_USEKEY { + for i := range keys { + k, err := hex.DecodeString(keys[i]) + if err != nil { + return errors.Errorf("split key %s is not in hex format", keys[i]) + } + splitKeys = append(splitKeys, k) + } + } + + op, err := operator.CreateSplitRegionOperator("admin-split-region", region, operator.OpAdmin, pdpb.CheckPolicy(policy), splitKeys) + if err != nil { + return err + } + + return h.addOperator(op) +} + +// AddScatterRegionOperator adds an operator to scatter a region. +func (h *Handler) AddScatterRegionOperator(regionID uint64, group string) error { + c, err := h.GetCluster() + if err != nil { + return err + } + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + if c.IsRegionHot(region) { + return errors.Errorf("region %d is a hot region", regionID) + } + + s, err := h.GetRegionScatterer() + if err != nil { + return err + } + + op, err := s.Scatter(region, group, false) + if err != nil { + return err + } + + if op == nil { + return nil + } + return h.addOperator(op) +} + +// AddScatterRegionsOperators add operators to scatter regions and return the processed percentage and error +func (h *Handler) AddScatterRegionsOperators(regionIDs []uint64, startRawKey, endRawKey, group string, retryLimit int) (int, error) { + s, err := h.GetRegionScatterer() + if err != nil { + return 0, err + } + opsCount := 0 + var failures map[uint64]error + // If startKey and endKey are both defined, use them first. + if len(startRawKey) > 0 && len(endRawKey) > 0 { + startKey, err := hex.DecodeString(startRawKey) + if err != nil { + return 0, err + } + endKey, err := hex.DecodeString(endRawKey) + if err != nil { + return 0, err + } + opsCount, failures, err = s.ScatterRegionsByRange(startKey, endKey, group, retryLimit) + if err != nil { + return 0, err + } + } else { + opsCount, failures, err = s.ScatterRegionsByID(regionIDs, group, retryLimit, false) + if err != nil { + return 0, err + } + } + percentage := 100 + if len(failures) > 0 { + percentage = 100 - 100*len(failures)/(opsCount+len(failures)) + } + return percentage, nil +} + +func (h *Handler) addOperator(ops ...*operator.Operator) error { + oc, err := h.GetOperatorController() + if err != nil { + return err + } + + if ok := oc.AddOperator(ops...); !ok { + return errors.WithStack(errs.ErrAddOperator) + } + return nil +} + +func checkStoreState(c sche.SharedCluster, storeID uint64) error { + store := c.GetStore(storeID) + if store == nil { + return errs.ErrStoreNotFound.FastGenByArgs(storeID) + } + if store.IsRemoved() { + return errs.ErrStoreRemoved.FastGenByArgs(storeID) + } + if store.IsUnhealthy() { + return errs.ErrStoreUnhealthy.FastGenByArgs(storeID) + } + return nil +} diff --git a/server/api/hot_status.go b/server/api/hot_status.go index 4f64f1bebc5..a3be72a56ac 100644 --- a/server/api/hot_status.go +++ b/server/api/hot_status.go @@ -22,6 +22,7 @@ import ( "strconv" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" @@ -117,7 +118,7 @@ func (h *hotStatusHandler) GetHotWriteRegions(w http.ResponseWriter, r *http.Req } store := rc.GetStore(id) if store == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrStoreNotFound(id).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrStoreNotFoundByID(id).Error()) return } ids = append(ids, id) @@ -153,7 +154,7 @@ func (h *hotStatusHandler) GetHotReadRegions(w http.ResponseWriter, r *http.Requ } store := rc.GetStore(id) if store == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrStoreNotFound(id).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrStoreNotFoundByID(id).Error()) return } ids = append(ids, id) diff --git a/server/api/label.go b/server/api/label.go index abaad02a4e3..115d93dfdd2 100644 --- a/server/api/label.go +++ b/server/api/label.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server" "github.com/unrolled/render" ) @@ -87,7 +88,7 @@ func (h *labelsHandler) GetStoresByLabel(w http.ResponseWriter, r *http.Request) storeID := s.GetId() store := rc.GetStore(storeID) if store == nil { - h.rd.JSON(w, http.StatusInternalServerError, server.ErrStoreNotFound(storeID).Error()) + h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFoundByID(storeID).Error()) return } diff --git a/server/api/region.go b/server/api/region.go index 1c21af53296..eaa0a29d2d2 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/statistics" @@ -763,7 +764,7 @@ func (h *regionsHandler) GetRegionSiblings(w http.ResponseWriter, r *http.Reques } region := rc.GetRegion(uint64(id)) if region == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrRegionNotFound(uint64(id)).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrRegionNotFound(uint64(id)).Error()) return } @@ -1037,7 +1038,7 @@ func (h *regionsHandler) ScatterRegions(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - opsCount, failures, err = rc.GetRegionScatter().ScatterRegionsByRange(startKey, endKey, group, retryLimit) + opsCount, failures, err = rc.GetRegionScatterer().ScatterRegionsByRange(startKey, endKey, group, retryLimit) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -1048,7 +1049,7 @@ func (h *regionsHandler) ScatterRegions(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusBadRequest, "regions_id is invalid") return } - opsCount, failures, err = rc.GetRegionScatter().ScatterRegionsByID(ids, group, retryLimit, false) + opsCount, failures, err = rc.GetRegionScatterer().ScatterRegionsByID(ids, group, retryLimit, false) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/server/api/rule.go b/server/api/rule.go index 33c63a8faa2..3188da90880 100644 --- a/server/api/rule.go +++ b/server/api/rule.go @@ -167,7 +167,7 @@ func (h *ruleHandler) preCheckForRegionAndRule(w http.ResponseWriter, r *http.Re } region := cluster.GetRegion(regionID) if region == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrRegionNotFound(regionID).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrStoreNotFoundByID(regionID).Error()) return cluster, nil } return cluster, region diff --git a/server/api/store.go b/server/api/store.go index a3e8c4518a2..65e67c1eff3 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -191,7 +191,7 @@ func (h *storeHandler) GetStore(w http.ResponseWriter, r *http.Request) { store := rc.GetStore(storeID) if store == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrStoreNotFound(storeID).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrStoreNotFoundByID(storeID).Error()) return } @@ -437,7 +437,7 @@ func (h *storeHandler) SetStoreLimit(w http.ResponseWriter, r *http.Request) { store := rc.GetStore(storeID) if store == nil { - h.rd.JSON(w, http.StatusInternalServerError, server.ErrStoreNotFound(storeID).Error()) + h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFoundByID(storeID).Error()) return } @@ -758,7 +758,7 @@ func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) { storeID := s.GetId() store := rc.GetStore(storeID) if store == nil { - h.rd.JSON(w, http.StatusInternalServerError, server.ErrStoreNotFound(storeID).Error()) + h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFoundByID(storeID).Error()) return } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b16de73f84e..53e92297710 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -765,8 +765,8 @@ func (c *RaftCluster) SetPrepared() { c.coordinator.GetPrepareChecker().SetPrepared() } -// GetRegionScatter returns the region scatter. -func (c *RaftCluster) GetRegionScatter() *scatter.RegionScatterer { +// GetRegionScatterer returns the region scatter. +func (c *RaftCluster) GetRegionScatterer() *scatter.RegionScatterer { return c.coordinator.GetRegionScatterer() } diff --git a/server/grpc_service.go b/server/grpc_service.go index 973c45a622f..d924e4ba8a9 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1704,7 +1704,7 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg region = core.NewRegionInfo(request.GetRegion(), request.GetLeader()) } - op, err := rc.GetRegionScatter().Scatter(region, request.GetGroup(), request.GetSkipStoreLimit()) + op, err := rc.GetRegionScatterer().Scatter(region, request.GetGroup(), request.GetSkipStoreLimit()) if err != nil { return nil, err } @@ -2106,7 +2106,7 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S // scatterRegions add operators to scatter regions and return the processed percentage and error func scatterRegions(cluster *cluster.RaftCluster, regionsID []uint64, group string, retryLimit int, skipStoreLimit bool) (int, error) { - opsCount, failures, err := cluster.GetRegionScatter().ScatterRegionsByID(regionsID, group, retryLimit, skipStoreLimit) + opsCount, failures, err := cluster.GetRegionScatterer().ScatterRegionsByID(regionsID, group, retryLimit, skipStoreLimit) if err != nil { return 0, err } diff --git a/server/handler.go b/server/handler.go index adc1e8ecd31..97446f17354 100644 --- a/server/handler.go +++ b/server/handler.go @@ -15,19 +15,15 @@ package server import ( - "bytes" - "encoding/hex" "encoding/json" "net/http" "net/url" "path" "strconv" - "strings" "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" @@ -35,9 +31,8 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule" sc "github.com/tikv/pd/pkg/schedule/config" - "github.com/tikv/pd/pkg/schedule/filter" - "github.com/tikv/pd/pkg/schedule/operator" - "github.com/tikv/pd/pkg/schedule/placement" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/buckets" @@ -50,36 +45,24 @@ import ( "go.uber.org/zap" ) -var ( - // SchedulerConfigHandlerPath is the api router path of the schedule config handler. - SchedulerConfigHandlerPath = "/api/v1/scheduler-config" - - // ErrOperatorNotFound is error info for operator not found. - ErrOperatorNotFound = errors.New("operator not found") - // ErrAddOperator is error info for already have an operator when adding operator. - ErrAddOperator = errors.New("failed to add operator, maybe already have one") - // ErrRegionNotAdjacent is error info for region not adjacent. - ErrRegionNotAdjacent = errors.New("two regions are not adjacent") - // ErrRegionNotFound is error info for region not found. - ErrRegionNotFound = func(regionID uint64) error { - return errors.Errorf("region %v not found", regionID) - } - // ErrRegionAbnormalPeer is error info for region has abnormal peer. - ErrRegionAbnormalPeer = func(regionID uint64) error { - return errors.Errorf("region %v has abnormal peer", regionID) - } - // ErrStoreNotFound is error info for store not found. - ErrStoreNotFound = func(storeID uint64) error { - return errors.Errorf("store %v not found", storeID) - } - // ErrPluginNotFound is error info for plugin not found. - ErrPluginNotFound = func(pluginPath string) error { - return errors.Errorf("plugin is not found: %s", pluginPath) - } -) +// SchedulerConfigHandlerPath is the api router path of the schedule config handler. +var SchedulerConfigHandlerPath = "/api/v1/scheduler-config" + +type server struct { + *Server +} + +func (s *server) GetCoordinator() (*schedule.Coordinator, error) { + return s.GetRaftCluster().GetCoordinator(), nil +} + +func (s *server) GetCluster() (sche.SharedCluster, error) { + return s.GetRaftCluster(), nil +} // Handler is a helper to export methods to handle API/RPC requests. type Handler struct { + *handler.Handler s *Server opt *config.PersistOptions pluginChMap map[string]chan string @@ -87,7 +70,16 @@ type Handler struct { } func newHandler(s *Server) *Handler { - return &Handler{s: s, opt: s.persistOptions, pluginChMap: make(map[string]chan string), pluginChMapLock: syncutil.RWMutex{}} + h := handler.NewHandler(&server{ + Server: s, + }) + return &Handler{ + Handler: h, + s: s, + opt: s.persistOptions, + pluginChMap: make(map[string]chan string), + pluginChMapLock: syncutil.RWMutex{}, + } } // GetRaftCluster returns RaftCluster. @@ -99,15 +91,6 @@ func (h *Handler) GetRaftCluster() (*cluster.RaftCluster, error) { return rc, nil } -// GetOperatorController returns OperatorController. -func (h *Handler) GetOperatorController() (*operator.Controller, error) { - rc := h.s.GetRaftCluster() - if rc == nil { - return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() - } - return rc.GetOperatorController(), nil -} - // IsSchedulerPaused returns whether scheduler is paused. func (h *Handler) IsSchedulerPaused(name string) (bool, error) { rc, err := h.GetRaftCluster() @@ -170,7 +153,7 @@ func (h *Handler) GetStores() ([]*core.StoreInfo, error) { storeID := s.GetId() store := rc.GetStore(storeID) if store == nil { - return nil, ErrStoreNotFound(storeID) + return nil, errs.ErrStoreNotFoundByID(storeID) } stores = append(stores, store) } @@ -407,119 +390,6 @@ func (h *Handler) AddGrantHotRegionScheduler(leaderID, peers string) error { return h.AddScheduler(schedulers.GrantHotRegionType, leaderID, peers) } -// GetOperator returns the region operator. -func (h *Handler) GetOperator(regionID uint64) (*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - - op := c.GetOperator(regionID) - if op == nil { - return nil, ErrOperatorNotFound - } - - return op, nil -} - -// GetOperatorStatus returns the status of the region operator. -func (h *Handler) GetOperatorStatus(regionID uint64) (*operator.OpWithStatus, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - - op := c.GetOperatorStatus(regionID) - if op == nil { - return nil, ErrOperatorNotFound - } - - return op, nil -} - -// RemoveOperator removes the region operator. -func (h *Handler) RemoveOperator(regionID uint64) error { - c, err := h.GetOperatorController() - if err != nil { - return err - } - - op := c.GetOperator(regionID) - if op == nil { - return ErrOperatorNotFound - } - - _ = c.RemoveOperator(op, operator.AdminStop) - return nil -} - -// GetOperators returns the running operators. -func (h *Handler) GetOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetOperators(), nil -} - -// GetWaitingOperators returns the waiting operators. -func (h *Handler) GetWaitingOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetWaitingOperators(), nil -} - -// GetAdminOperators returns the running admin operators. -func (h *Handler) GetAdminOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetOperatorsOfKind(operator.OpAdmin), nil -} - -// GetLeaderOperators returns the running leader operators. -func (h *Handler) GetLeaderOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetOperatorsOfKind(operator.OpLeader), nil -} - -// GetRegionOperators returns the running region operators. -func (h *Handler) GetRegionOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetOperatorsOfKind(operator.OpRegion), nil -} - -// GetHistory returns finished operators' history since start. -func (h *Handler) GetHistory(start time.Time) ([]operator.OpHistory, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetHistory(start), nil -} - -// GetRecords returns finished operators since start. -func (h *Handler) GetRecords(from time.Time) ([]*operator.OpRecord, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - records := c.GetRecords(from) - if len(records) == 0 { - return nil, ErrOperatorNotFound - } - return records, nil -} - // SetAllStoresLimit is used to set limit of all stores. func (h *Handler) SetAllStoresLimit(ratePerMin float64, limitType storelimit.Type) error { c, err := h.GetRaftCluster() @@ -576,349 +446,6 @@ func (h *Handler) SetStoreLimit(storeID uint64, ratePerMin float64, limitType st return c.SetStoreLimit(storeID, limitType, ratePerMin) } -// AddTransferLeaderOperator adds an operator to transfer leader to the store. -func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - newLeader := region.GetStoreVoter(storeID) - if newLeader == nil { - return errors.Errorf("region has no voter in store %v", storeID) - } - - op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, region.GetLeader().GetStoreId(), newLeader.GetStoreId(), []uint64{}, operator.OpAdmin) - if err != nil { - log.Debug("fail to create transfer leader operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddTransferRegionOperator adds an operator to transfer region to the stores. -func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64]placement.PeerRoleType) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - if c.GetOpts().IsPlacementRulesEnabled() { - // Cannot determine role without peer role when placement rules enabled. Not supported now. - for _, role := range storeIDs { - if len(role) == 0 { - return errors.New("transfer region without peer role is not supported when placement rules enabled") - } - } - } - for id := range storeIDs { - if err := checkStoreState(c, id); err != nil { - return err - } - } - - roles := make(map[uint64]placement.PeerRoleType) - for id, peerRole := range storeIDs { - if peerRole == "" { - peerRole = placement.Voter - } - roles[id] = peerRole - } - op, err := operator.CreateMoveRegionOperator("admin-move-region", c, region, operator.OpAdmin, roles) - if err != nil { - log.Debug("fail to create move region operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddTransferPeerOperator adds an operator to transfer peer. -func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreID uint64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - oldPeer := region.GetStorePeer(fromStoreID) - if oldPeer == nil { - return errors.Errorf("region has no peer in store %v", fromStoreID) - } - - if err := checkStoreState(c, toStoreID); err != nil { - return err - } - - newPeer := &metapb.Peer{StoreId: toStoreID, Role: oldPeer.GetRole(), IsWitness: oldPeer.GetIsWitness()} - op, err := operator.CreateMovePeerOperator("admin-move-peer", c, region, operator.OpAdmin, fromStoreID, newPeer) - if err != nil { - log.Debug("fail to create move peer operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// checkAdminAddPeerOperator checks adminAddPeer operator with given region ID and store ID. -func (h *Handler) checkAdminAddPeerOperator(regionID uint64, toStoreID uint64) (*cluster.RaftCluster, *core.RegionInfo, error) { - c, err := h.GetRaftCluster() - if err != nil { - return nil, nil, err - } - - region := c.GetRegion(regionID) - if region == nil { - return nil, nil, ErrRegionNotFound(regionID) - } - - if region.GetStorePeer(toStoreID) != nil { - return nil, nil, errors.Errorf("region already has peer in store %v", toStoreID) - } - - if err := checkStoreState(c, toStoreID); err != nil { - return nil, nil, err - } - - return c, region, nil -} - -// AddAddPeerOperator adds an operator to add peer. -func (h *Handler) AddAddPeerOperator(regionID uint64, toStoreID uint64) error { - c, region, err := h.checkAdminAddPeerOperator(regionID, toStoreID) - if err != nil { - return err - } - - newPeer := &metapb.Peer{StoreId: toStoreID} - op, err := operator.CreateAddPeerOperator("admin-add-peer", c, region, newPeer, operator.OpAdmin) - if err != nil { - log.Debug("fail to create add peer operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddAddLearnerOperator adds an operator to add learner. -func (h *Handler) AddAddLearnerOperator(regionID uint64, toStoreID uint64) error { - c, region, err := h.checkAdminAddPeerOperator(regionID, toStoreID) - if err != nil { - return err - } - - newPeer := &metapb.Peer{ - StoreId: toStoreID, - Role: metapb.PeerRole_Learner, - } - - op, err := operator.CreateAddPeerOperator("admin-add-learner", c, region, newPeer, operator.OpAdmin) - if err != nil { - log.Debug("fail to create add learner operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddRemovePeerOperator adds an operator to remove peer. -func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - if region.GetStorePeer(fromStoreID) == nil { - return errors.Errorf("region has no peer in store %v", fromStoreID) - } - - op, err := operator.CreateRemovePeerOperator("admin-remove-peer", c, operator.OpAdmin, region, fromStoreID) - if err != nil { - log.Debug("fail to create move peer operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddMergeRegionOperator adds an operator to merge region. -func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - target := c.GetRegion(targetID) - if target == nil { - return ErrRegionNotFound(targetID) - } - - if !filter.IsRegionHealthy(region) || !filter.IsRegionReplicated(c, region) { - return ErrRegionAbnormalPeer(regionID) - } - - if !filter.IsRegionHealthy(target) || !filter.IsRegionReplicated(c, target) { - return ErrRegionAbnormalPeer(targetID) - } - - // for the case first region (start key is nil) with the last region (end key is nil) but not adjacent - if (!bytes.Equal(region.GetStartKey(), target.GetEndKey()) || len(region.GetStartKey()) == 0) && - (!bytes.Equal(region.GetEndKey(), target.GetStartKey()) || len(region.GetEndKey()) == 0) { - return ErrRegionNotAdjacent - } - - ops, err := operator.CreateMergeRegionOperator("admin-merge-region", c, region, target, operator.OpAdmin) - if err != nil { - log.Debug("fail to create merge region operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(ops...); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddSplitRegionOperator adds an operator to split a region. -func (h *Handler) AddSplitRegionOperator(regionID uint64, policyStr string, keys []string) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - policy, ok := pdpb.CheckPolicy_value[strings.ToUpper(policyStr)] - if !ok { - return errors.Errorf("check policy %s is not supported", policyStr) - } - - var splitKeys [][]byte - if pdpb.CheckPolicy(policy) == pdpb.CheckPolicy_USEKEY { - for i := range keys { - k, err := hex.DecodeString(keys[i]) - if err != nil { - return errors.Errorf("split key %s is not in hex format", keys[i]) - } - splitKeys = append(splitKeys, k) - } - } - - op, err := operator.CreateSplitRegionOperator("admin-split-region", region, operator.OpAdmin, pdpb.CheckPolicy(policy), splitKeys) - if err != nil { - return err - } - - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddScatterRegionOperator adds an operator to scatter a region. -func (h *Handler) AddScatterRegionOperator(regionID uint64, group string) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - if c.IsRegionHot(region) { - return errors.Errorf("region %d is a hot region", regionID) - } - - op, err := c.GetRegionScatter().Scatter(region, group, false) - if err != nil { - return err - } - - if op == nil { - return nil - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddScatterRegionsOperators add operators to scatter regions and return the processed percentage and error -func (h *Handler) AddScatterRegionsOperators(regionIDs []uint64, startRawKey, endRawKey, group string, retryLimit int) (int, error) { - c, err := h.GetRaftCluster() - if err != nil { - return 0, err - } - opsCount := 0 - var failures map[uint64]error - // If startKey and endKey are both defined, use them first. - if len(startRawKey) > 0 && len(endRawKey) > 0 { - startKey, err := hex.DecodeString(startRawKey) - if err != nil { - return 0, err - } - endKey, err := hex.DecodeString(endRawKey) - if err != nil { - return 0, err - } - opsCount, failures, err = c.GetRegionScatter().ScatterRegionsByRange(startKey, endKey, group, retryLimit) - if err != nil { - return 0, err - } - } else { - opsCount, failures, err = c.GetRegionScatter().ScatterRegionsByID(regionIDs, group, retryLimit, false) - if err != nil { - return 0, err - } - } - percentage := 100 - if len(failures) > 0 { - percentage = 100 - 100*len(failures)/(opsCount+len(failures)) - } - return percentage, nil -} - // GetRegionsByType gets the region with specified type. func (h *Handler) GetRegionsByType(typ statistics.RegionStatisticType) ([]*core.RegionInfo, error) { c := h.s.GetRaftCluster() @@ -1010,7 +537,7 @@ func (h *Handler) PluginUnload(pluginPath string) error { ch <- schedule.PluginUnload return nil } - return ErrPluginNotFound(pluginPath) + return errs.ErrPluginNotFound(pluginPath) } // GetAddr returns the server urls for clients. @@ -1101,20 +628,6 @@ func (h *Handler) GetHistoryHotRegionIter( return iter } -func checkStoreState(rc *cluster.RaftCluster, storeID uint64) error { - store := rc.GetStore(storeID) - if store == nil { - return errs.ErrStoreNotFound.FastGenByArgs(storeID) - } - if store.IsRemoved() { - return errs.ErrStoreRemoved.FastGenByArgs(storeID) - } - if store.IsUnhealthy() { - return errs.ErrStoreUnhealthy.FastGenByArgs(storeID) - } - return nil -} - // RedirectSchedulerUpdate update scheduler config. Export this func to help handle damaged store. func (h *Handler) redirectSchedulerUpdate(name string, storeID float64) error { input := make(map[string]interface{})