From 3b59a7888d66613aa62657e7e76a38e550ccb1c7 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 29 Jun 2023 16:11:18 +0800 Subject: [PATCH 1/8] tools: support get all groups (#6714) ref tikv/pd#5895, ref tikv/pd#6706 Signed-off-by: Ryan Leung --- tests/pdctl/keyspace/keyspace_group_test.go | 12 ++++++++++++ .../pdctl/command/keyspace_group_command.go | 16 +++++++++------- tools/pd-ctl/pdctl/command/store_command.go | 1 - 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/tests/pdctl/keyspace/keyspace_group_test.go b/tests/pdctl/keyspace/keyspace_group_test.go index d3c478da7c6..c982dc140af 100644 --- a/tests/pdctl/keyspace/keyspace_group_test.go +++ b/tests/pdctl/keyspace/keyspace_group_test.go @@ -128,6 +128,18 @@ func TestSplitKeyspaceGroup(t *testing.T) { return strings.Contains(string(output), "Success") }) + // get all keyspaces + args := []string{"-u", pdAddr, "keyspace-group"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Success") + var keyspaceGroups []*endpoint.KeyspaceGroup + err = json.Unmarshal(output, &keyspaceGroups) + re.NoError(err) + re.Len(keyspaceGroups, 2) + re.Equal(keyspaceGroups[0].ID, uint32(0)) + re.Equal(keyspaceGroups[1].ID, uint32(1)) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) } diff --git a/tools/pd-ctl/pdctl/command/keyspace_group_command.go b/tools/pd-ctl/pdctl/command/keyspace_group_command.go index 246bc2a60da..a4be612a301 100644 --- a/tools/pd-ctl/pdctl/command/keyspace_group_command.go +++ b/tools/pd-ctl/pdctl/command/keyspace_group_command.go @@ -132,13 +132,15 @@ func showKeyspaceGroupsCommandFunc(cmd *cobra.Command, args []string) { cmd.Printf("Failed to get state: %s\n", err) } stateValue := "" - state = strings.ToLower(state) - switch state { - case "merge", "split": - stateValue = fmt.Sprintf("state=%v", state) - default: - cmd.Println("Unknown state: " + state) - return + if len(state) != 0 { + state = strings.ToLower(state) + switch state { + case "merge", "split": + stateValue = fmt.Sprintf("state=%v", state) + default: + cmd.Println("Unknown state: " + state) + return + } } if len(stateValue) != 0 { diff --git a/tools/pd-ctl/pdctl/command/store_command.go b/tools/pd-ctl/pdctl/command/store_command.go index 79ed9673e83..1dee1c13a72 100644 --- a/tools/pd-ctl/pdctl/command/store_command.go +++ b/tools/pd-ctl/pdctl/command/store_command.go @@ -565,7 +565,6 @@ func labelStoreCommandFunc(cmd *cobra.Command, args []string) { } else if rewrite, _ := cmd.Flags().GetBool("rewrite"); rewrite { prefix += "?force=true" } - cmd.Println(prefix) postJSON(cmd, prefix, labels) } From fa721e7c25d3eb05e5a3eadfd50b1f686d62a8a1 Mon Sep 17 00:00:00 2001 From: Hu# Date: Thu, 29 Jun 2023 19:12:48 +0800 Subject: [PATCH 2/8] api: remove redundant code (#6716) ref tikv/pd#4399 Signed-off-by: husharp --- server/api/router.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/api/router.go b/server/api/router.go index d3650de09ee..0fc77ba6b19 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -297,7 +297,6 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/admin/cache/region/{id}", adminHandler.DeleteRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/admin/cache/regions", adminHandler.DeleteAllRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/admin/persist-file/{file_name}", adminHandler.SavePersistFile, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) - registerFunc(apiRouter, "/admin/persist-file/{file_name}", adminHandler.SavePersistFile, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/admin/cluster/markers/snapshot-recovering", adminHandler.IsSnapshotRecovering, setMethods(http.MethodGet), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/admin/cluster/markers/snapshot-recovering", adminHandler.MarkSnapshotRecovering, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/admin/cluster/markers/snapshot-recovering", adminHandler.UnmarkSnapshotRecovering, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) From c586e5c0a21bbf7c82ee53ad45caeb8e7d4a9776 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Thu, 29 Jun 2023 20:44:11 -0700 Subject: [PATCH 3/8] Fix data race between read APIs and finshiSplit/finishMerge in keyspace group manager (#6723) close tikv/pd#6721 checkTSOMerge and checkTSOSplit will read from kgm.getKeyspaceGroupMeta finishMergeKeyspaceGroup and finishSplitKeyspaceGroup will update kgm so just return a copy to avoid data race Signed-off-by: Bin Shi --- pkg/tso/keyspace_group_manager.go | 98 +++++++++++++++++++------------ 1 file changed, 60 insertions(+), 38 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index a82376430fa..0291bc5863d 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -112,6 +112,39 @@ func (s *state) getKeyspaceGroupMeta( return s.ams[groupID], s.kgs[groupID] } +func (s *state) checkTSOSplit( + targetGroupID uint32, +) (splitTargetAM, splitSourceAM *AllocatorManager, err error) { + s.RLock() + defer s.RUnlock() + splitTargetAM, splitTargetGroup := s.ams[targetGroupID], s.kgs[targetGroupID] + // Only the split target keyspace group needs to check the TSO split. + if !splitTargetGroup.IsSplitTarget() { + return nil, nil, nil // it isn't in the split state + } + sourceGroupID := splitTargetGroup.SplitSource() + splitSourceAM, splitSourceGroup := s.ams[sourceGroupID], s.kgs[sourceGroupID] + if splitSourceAM == nil || splitSourceGroup == nil { + log.Error("the split source keyspace group is not initialized", + zap.Uint32("source", sourceGroupID)) + return nil, nil, errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(sourceGroupID) + } + return splitTargetAM, splitSourceAM, nil +} + +// Reject any request if the keyspace group is in merging state, +// we need to wait for the merging checker to finish the TSO merging. +func (s *state) checkTSOMerge( + groupID uint32, +) error { + s.RLock() + defer s.RUnlock() + if s.kgs[groupID] == nil || !s.kgs[groupID].IsMerging() { + return nil + } + return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(groupID) +} + // getKeyspaceGroupMetaWithCheck returns the keyspace group meta of the given keyspace. // It also checks if the keyspace is served by the given keyspace group. If not, it returns the meta // of the keyspace group to which the keyspace currently belongs and returns NotServed (by the given @@ -957,7 +990,7 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } - err = kgm.checkTSOMerge(curKeyspaceGroupID) + err = kgm.state.checkTSOMerge(curKeyspaceGroupID) if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } @@ -1032,19 +1065,11 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( keyspaceGroupID uint32, dcLocation string, ) error { - splitAM, splitGroup := kgm.getKeyspaceGroupMeta(keyspaceGroupID) - // Only the split target keyspace group needs to check the TSO split. - if !splitGroup.IsSplitTarget() { - return nil - } - splitSource := splitGroup.SplitSource() - splitSourceAM, splitSourceGroup := kgm.getKeyspaceGroupMeta(splitSource) - if splitSourceAM == nil || splitSourceGroup == nil { - log.Error("the split source keyspace group is not initialized", - zap.Uint32("source", splitSource)) - return errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(splitSource) + splitTargetAM, splitSourceAM, err := kgm.state.checkTSOSplit(keyspaceGroupID) + if err != nil || splitTargetAM == nil { + return err } - splitAllocator, err := splitAM.GetAllocator(dcLocation) + splitTargetAllocator, err := splitTargetAM.GetAllocator(dcLocation) if err != nil { return err } @@ -1052,7 +1077,7 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( if err != nil { return err } - splitTSO, err := splitAllocator.GenerateTSO(1) + splitTargetTSO, err := splitTargetAllocator.GenerateTSO(1) if err != nil { return err } @@ -1061,19 +1086,19 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( return err } // If the split source TSO is not greater than the newly split TSO, we don't need to do anything. - if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 { + if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTargetTSO) <= 0 { log.Info("the split source tso is less than the newly split tso", zap.Int64("split-source-tso-physical", splitSourceTSO.Physical), zap.Int64("split-source-tso-logical", splitSourceTSO.Logical), - zap.Int64("split-tso-physical", splitTSO.Physical), - zap.Int64("split-tso-logical", splitTSO.Logical)) + zap.Int64("split-tso-physical", splitTargetTSO.Physical), + zap.Int64("split-tso-logical", splitTargetTSO.Logical)) // Finish the split state directly. return kgm.finishSplitKeyspaceGroup(keyspaceGroupID) } // If the split source TSO is greater than the newly split TSO, we need to update the split // TSO to make sure the following TSO will be greater than the split keyspaces ever had // in the past. - err = splitAllocator.SetTSO(tsoutil.GenerateTS(&pdpb.Timestamp{ + err = splitTargetAllocator.SetTSO(tsoutil.GenerateTS(&pdpb.Timestamp{ Physical: splitSourceTSO.Physical + 1, Logical: splitSourceTSO.Logical, }), true, true) @@ -1083,8 +1108,8 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( log.Info("the split source tso is greater than the newly split tso", zap.Int64("split-source-tso-physical", splitSourceTSO.Physical), zap.Int64("split-source-tso-logical", splitSourceTSO.Logical), - zap.Int64("split-tso-physical", splitTSO.Physical), - zap.Int64("split-tso-logical", splitTSO.Logical)) + zap.Int64("split-tso-physical", splitTargetTSO.Physical), + zap.Int64("split-tso-logical", splitTargetTSO.Logical)) // Finish the split state. return kgm.finishSplitKeyspaceGroup(keyspaceGroupID) } @@ -1116,9 +1141,13 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error { zap.Int("status-code", statusCode)) return errs.ErrSendRequest.FastGenByArgs() } - // Pre-update the split keyspace group split state in memory. - splitGroup.SplitState = nil - kgm.kgs[id] = splitGroup + // Pre-update the split keyspace group's split state in memory. + // Note: to avoid data race with state read APIs, we always replace the group in memory as a whole. + // For now, we only have scenarios to update split state/merge state, and the other fields are always + // loaded from etcd without any modification, so we can simply copy the group and replace the state. + newSplitGroup := *splitGroup + newSplitGroup.SplitState = nil + kgm.kgs[id] = &newSplitGroup return nil } @@ -1146,9 +1175,14 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error { zap.Int("status-code", statusCode)) return errs.ErrSendRequest.FastGenByArgs() } - // Pre-update the split keyspace group split state in memory. - mergeTarget.MergeState = nil - kgm.kgs[id] = mergeTarget + + // Pre-update the merge target keyspace group's merge state in memory. + // Note: to avoid data race with state read APIs, we always replace the group in memory as a whole. + // For now, we only have scenarios to update split state/merge state, and the other fields are always + // loaded from etcd without any modification, so we can simply copy the group and replace the state. + newTargetGroup := *mergeTarget + newTargetGroup.MergeState = nil + kgm.kgs[id] = &newTargetGroup return nil } @@ -1286,15 +1320,3 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget return } } - -// Reject any request if the keyspace group is in merging state, -// we need to wait for the merging checker to finish the TSO merging. -func (kgm *KeyspaceGroupManager) checkTSOMerge( - keyspaceGroupID uint32, -) error { - _, group := kgm.getKeyspaceGroupMeta(keyspaceGroupID) - if !group.IsMerging() { - return nil - } - return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(keyspaceGroupID) -} From 0fe5eb40ba38c1139b9cc0551a2b3a22982fe356 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 30 Jun 2023 16:32:12 +0800 Subject: [PATCH 4/8] tso: fix memory leak introduced by timer.After (#6730) close tikv/pd#6719, ref tikv/pd#6720 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/timerpool/pool.go | 43 ++++++++++++++++++ pkg/timerpool/pool_test.go | 70 +++++++++++++++++++++++++++++ pkg/utils/tsoutil/tso_dispatcher.go | 17 ++++--- server/grpc_service.go | 12 +++-- 4 files changed, 133 insertions(+), 9 deletions(-) create mode 100644 pkg/timerpool/pool.go create mode 100644 pkg/timerpool/pool_test.go diff --git a/pkg/timerpool/pool.go b/pkg/timerpool/pool.go new file mode 100644 index 00000000000..28ffacfc629 --- /dev/null +++ b/pkg/timerpool/pool.go @@ -0,0 +1,43 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerpool + +import ( + "sync" + "time" +) + +// GlobalTimerPool is a global pool for reusing *time.Timer. +var GlobalTimerPool TimerPool + +// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse. +type TimerPool struct { + pool sync.Pool +} + +// Get returns a timer with a given duration. +func (tp *TimerPool) Get(d time.Duration) *time.Timer { + if v := tp.pool.Get(); v != nil { + timer := v.(*time.Timer) + timer.Reset(d) + return timer + } + return time.NewTimer(d) +} + +// Put tries to call timer.Stop() before putting it back into pool, +// if the timer.Stop() returns false (it has either already expired or been stopped), +// have a shot at draining the channel with residual time if there is one. +func (tp *TimerPool) Put(timer *time.Timer) { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + tp.pool.Put(timer) +} diff --git a/pkg/timerpool/pool_test.go b/pkg/timerpool/pool_test.go new file mode 100644 index 00000000000..d6dffc723a9 --- /dev/null +++ b/pkg/timerpool/pool_test.go @@ -0,0 +1,70 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerpool + +import ( + "testing" + "time" +) + +func TestTimerPool(t *testing.T) { + var tp TimerPool + + for i := 0; i < 100; i++ { + timer := tp.Get(20 * time.Millisecond) + + select { + case <-timer.C: + t.Errorf("timer expired too early") + continue + default: + } + + select { + case <-time.After(100 * time.Millisecond): + t.Errorf("timer didn't expire on time") + case <-timer.C: + } + + tp.Put(timer) + } +} + +const timeout = 10 * time.Millisecond + +func BenchmarkTimerUtilization(b *testing.B) { + b.Run("TimerWithPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) + b.Run("TimerWithoutPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} + +func BenchmarkTimerPoolParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) +} + +func BenchmarkTimerNativeParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 69baf4b1e41..f9585ba5cdd 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/timerpool" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" @@ -197,7 +198,7 @@ func (s *TSODispatcher) finishRequest(requests []Request, physical, firstLogical // TSDeadline is used to watch the deadline of each tso request. type TSDeadline struct { - timer <-chan time.Time + timer *time.Timer done chan struct{} cancel context.CancelFunc } @@ -208,8 +209,9 @@ func NewTSDeadline( done chan struct{}, cancel context.CancelFunc, ) *TSDeadline { + timer := timerpool.GlobalTimerPool.Get(timeout) return &TSDeadline{ - timer: time.After(timeout), + timer: timer, done: done, cancel: cancel, } @@ -224,13 +226,15 @@ func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) { select { case d := <-tsDeadlineCh: select { - case <-d.timer: + case <-d.timer.C: log.Error("tso proxy request processing is canceled due to timeout", errs.ZapError(errs.ErrProxyTSOTimeout)) d.cancel() + timerpool.GlobalTimerPool.Put(d.timer) case <-d.done: - continue + timerpool.GlobalTimerPool.Put(d.timer) case <-ctx.Done(): + timerpool.GlobalTimerPool.Put(d.timer) return } case <-ctx.Done(): @@ -241,11 +245,12 @@ func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) { func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) { defer logutil.LogPanic() - + timer := time.NewTimer(3 * time.Second) + defer timer.Stop() select { case <-done: return - case <-time.After(3 * time.Second): + case <-timer.C: cancel() case <-streamCtx.Done(): } diff --git a/server/grpc_service.go b/server/grpc_service.go index 1badabb19d8..f66bd37ed11 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -606,13 +606,15 @@ func (s *tsoServer) Send(m *pdpb.TsoResponse) error { }) done <- s.stream.Send(m) }() + timer := time.NewTimer(tsoutil.DefaultTSOProxyTimeout) + defer timer.Stop() select { case err := <-done: if err != nil { atomic.StoreInt32(&s.closed, 1) } return errors.WithStack(err) - case <-time.After(tsoutil.DefaultTSOProxyTimeout): + case <-timer.C: atomic.StoreInt32(&s.closed, 1) return ErrForwardTSOTimeout } @@ -633,6 +635,8 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) { request, err := s.stream.Recv() requestCh <- &pdpbTSORequest{request: request, err: err} }() + timer := time.NewTimer(timeout) + defer timer.Stop() select { case req := <-requestCh: if req.err != nil { @@ -640,7 +644,7 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) { return nil, errors.WithStack(req.err) } return req.request, nil - case <-time.After(timeout): + case <-timer.C: atomic.StoreInt32(&s.closed, 1) return nil, ErrTSOProxyRecvFromClientTimeout } @@ -2173,10 +2177,12 @@ func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient // TODO: If goroutine here timeout when tso stream created successfully, we need to handle it correctly. func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) { defer logutil.LogPanic() + timer := time.NewTimer(3 * time.Second) + defer timer.Stop() select { case <-done: return - case <-time.After(3 * time.Second): + case <-timer.C: cancel() case <-streamCtx.Done(): } From 53d5a9224d9826af6b4698cf03010aa955bec9f8 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Fri, 30 Jun 2023 21:48:42 -0700 Subject: [PATCH 5/8] mcs, tso: fix split and split-range command bugs. (#6732) close tikv/pd#6687, close tikv/pd#6731 Fix split and split-range command bugs. Signed-off-by: Bin Shi --- pkg/keyspace/tso_keyspace_group.go | 16 ++++++++-- pkg/keyspace/tso_keyspace_group_test.go | 41 +++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index ce9af860039..dd9319e806f 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -622,7 +622,17 @@ func buildSplitKeyspaces( oldSplit = append(oldSplit, keyspace) } } - return oldSplit, new, nil + // If newNum != len(newKeyspaceMap), it means the provided new keyspace list contains + // duplicate keyspaces, and we need to dedup them (https://github.com/tikv/pd/issues/6687); + // otherwise, we can just return the old split and new keyspace list. + if newNum == len(newKeyspaceMap) { + return oldSplit, new, nil + } + newSplit := make([]uint32, 0, len(newKeyspaceMap)) + for keyspace := range newKeyspaceMap { + newSplit = append(newSplit, keyspace) + } + return oldSplit, newSplit, nil } // Split according to the start and end keyspace ID. if startKeyspaceID == 0 && endKeyspaceID == 0 { @@ -634,7 +644,9 @@ func buildSplitKeyspaces( ) for _, keyspace := range old { if keyspace == utils.DefaultKeyspaceID { - return nil, nil, ErrModifyDefaultKeyspace + // The source keyspace group must be the default keyspace group and we always keep the default + // keyspace in the default keyspace group. + continue } if startKeyspaceID <= keyspace && keyspace <= endKeyspaceID { newSplit = append(newSplit, keyspace) diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index 40b779382cd..e8a40a839c8 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -483,6 +483,26 @@ func TestBuildSplitKeyspaces(t *testing.T) { new: []uint32{6}, err: ErrKeyspaceNotInKeyspaceGroup, }, + { + old: []uint32{1, 2}, + new: []uint32{2, 2}, + expectedOld: []uint32{1}, + expectedNew: []uint32{2}, + }, + { + old: []uint32{0, 1, 2, 3, 4, 5}, + startKeyspaceID: 2, + endKeyspaceID: 4, + expectedOld: []uint32{0, 1, 5}, + expectedNew: []uint32{2, 3, 4}, + }, + { + old: []uint32{0, 1, 2, 3, 4, 5}, + startKeyspaceID: 0, + endKeyspaceID: 4, + expectedOld: []uint32{0, 5}, + expectedNew: []uint32{1, 2, 3, 4}, + }, { old: []uint32{1, 2, 3, 4, 5}, startKeyspaceID: 2, @@ -490,6 +510,13 @@ func TestBuildSplitKeyspaces(t *testing.T) { expectedOld: []uint32{1, 5}, expectedNew: []uint32{2, 3, 4}, }, + { + old: []uint32{1, 2, 3, 4, 5}, + startKeyspaceID: 5, + endKeyspaceID: 6, + expectedOld: []uint32{1, 2, 3, 4}, + expectedNew: []uint32{5}, + }, { old: []uint32{1, 2, 3, 4, 5}, startKeyspaceID: 2, @@ -497,6 +524,13 @@ func TestBuildSplitKeyspaces(t *testing.T) { expectedOld: []uint32{1}, expectedNew: []uint32{2, 3, 4, 5}, }, + { + old: []uint32{1, 2, 3, 4, 5}, + startKeyspaceID: 1, + endKeyspaceID: 1, + expectedOld: []uint32{2, 3, 4, 5}, + expectedNew: []uint32{1}, + }, { old: []uint32{1, 2, 3, 4, 5}, startKeyspaceID: 0, @@ -504,6 +538,13 @@ func TestBuildSplitKeyspaces(t *testing.T) { expectedOld: []uint32{}, expectedNew: []uint32{1, 2, 3, 4, 5}, }, + { + old: []uint32{1, 2, 3, 4, 5}, + startKeyspaceID: 7, + endKeyspaceID: 10, + expectedOld: []uint32{1, 2, 3, 4, 5}, + expectedNew: []uint32{}, + }, { old: []uint32{1, 2, 3, 4, 5}, err: ErrKeyspaceNotInKeyspaceGroup, From 626d1c8f36959ac43a2d97300458fa6ded993bf7 Mon Sep 17 00:00:00 2001 From: wuhuizuo Date: Mon, 3 Jul 2023 14:03:42 +0800 Subject: [PATCH 6/8] ci: add configuration fileds to codecov.yaml (#6666) ref pingcap-qe/ci#2171, close tikv/pd#6667 ci: add configuration fileds to codecov.yaml - `ignore`: ignore integration test cases or tools paths. - `flags`: to split static for unit and integration testing. - `comment`: add `flags` section in comments. Signed-off-by: wuhuizuo --- codecov.yml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/codecov.yml b/codecov.yml index dc58a648479..44a3aac24b7 100644 --- a/codecov.yml +++ b/codecov.yml @@ -10,3 +10,20 @@ coverage: # basic target: auto threshold: 3% + +comment: + layout: "header, diff, flags" + behavior: default + require_changes: false + +flag_management: + default_rules: # the rules that will be followed for any flag added, generally + carryforward: true + statuses: + - type: project + target: 85% + - type: patch + target: 85% + +ignore: + - tests/** # integration test cases or tools. From 87b5adef8f787c55677e80acd01c75829a5f7f17 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 3 Jul 2023 14:27:42 +0800 Subject: [PATCH 7/8] keyspace: some cherry-pick from pd-cse (#6477) ref tikv/pd#4399 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/keyspace/keyspace.go | 4 -- pkg/keyspace/util.go | 35 ++++++++++---- server/api/region.go | 51 ++++++++++++++++++++ server/api/router.go | 1 + server/keyspace_service.go | 8 ++- server/server.go | 1 + tools/pd-ctl/pdctl/command/region_command.go | 39 +++++++++++++++ 7 files changed, 124 insertions(+), 15 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index adcb0be3106..df7eb653828 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -374,7 +374,6 @@ func (manager *Manager) LoadKeyspace(name string) (*keyspacepb.KeyspaceMeta, err if meta == nil { return ErrKeyspaceNotFound } - meta.Id = id return nil }) return meta, err @@ -397,9 +396,6 @@ func (manager *Manager) LoadKeyspaceByID(spaceID uint32) (*keyspacepb.KeyspaceMe } return nil }) - if meta != nil { - meta.Id = spaceID - } return meta, err } diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index bf4f8413bb1..240306f8124 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -135,7 +135,7 @@ func MaskKeyspaceID(id uint32) uint32 { return id & 0xFF } -// makeKeyRanges encodes keyspace ID to correct LabelRule data. +// RegionBound represents the region boundary of the given keyspace. // For a keyspace with id ['a', 'b', 'c'], it has four boundaries: // // Lower bound for raw mode: ['r', 'a', 'b', 'c'] @@ -147,23 +147,38 @@ func MaskKeyspaceID(id uint32) uint32 { // And shares upper bound with keyspace with id ['a', 'b', 'c + 1']. // These repeated bound will not cause any problem, as repetitive bound will be ignored during rangeListBuild, // but provides guard against hole in keyspace allocations should it occur. -func makeKeyRanges(id uint32) []interface{} { +type RegionBound struct { + RawLeftBound []byte + RawRightBound []byte + TxnLeftBound []byte + TxnRightBound []byte +} + +// MakeRegionBound constructs the correct region boundaries of the given keyspace. +func MakeRegionBound(id uint32) *RegionBound { keyspaceIDBytes := make([]byte, 4) nextKeyspaceIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(keyspaceIDBytes, id) binary.BigEndian.PutUint32(nextKeyspaceIDBytes, id+1) - rawLeftBound := hex.EncodeToString(codec.EncodeBytes(append([]byte{'r'}, keyspaceIDBytes[1:]...))) - rawRightBound := hex.EncodeToString(codec.EncodeBytes(append([]byte{'r'}, nextKeyspaceIDBytes[1:]...))) - txnLeftBound := hex.EncodeToString(codec.EncodeBytes(append([]byte{'x'}, keyspaceIDBytes[1:]...))) - txnRightBound := hex.EncodeToString(codec.EncodeBytes(append([]byte{'x'}, nextKeyspaceIDBytes[1:]...))) + return &RegionBound{ + RawLeftBound: codec.EncodeBytes(append([]byte{'r'}, keyspaceIDBytes[1:]...)), + RawRightBound: codec.EncodeBytes(append([]byte{'r'}, nextKeyspaceIDBytes[1:]...)), + TxnLeftBound: codec.EncodeBytes(append([]byte{'x'}, keyspaceIDBytes[1:]...)), + TxnRightBound: codec.EncodeBytes(append([]byte{'x'}, nextKeyspaceIDBytes[1:]...)), + } +} + +// makeKeyRanges encodes keyspace ID to correct LabelRule data. +func makeKeyRanges(id uint32) []interface{} { + regionBound := MakeRegionBound(id) return []interface{}{ map[string]interface{}{ - "start_key": rawLeftBound, - "end_key": rawRightBound, + "start_key": hex.EncodeToString(regionBound.RawLeftBound), + "end_key": hex.EncodeToString(regionBound.RawRightBound), }, map[string]interface{}{ - "start_key": txnLeftBound, - "end_key": txnRightBound, + "start_key": hex.EncodeToString(regionBound.TxnLeftBound), + "end_key": hex.EncodeToString(regionBound.TxnRightBound), }, } } diff --git a/server/api/region.go b/server/api/region.go index e17b9c8da25..894a85ecd56 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/utils/apiutil" @@ -398,6 +399,56 @@ func (h *regionsHandler) GetStoreRegions(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusOK, regionsInfo) } +// @Tags region +// @Summary List regions belongs to the given keyspace ID. +// @Param keyspace_id query string true "Keyspace ID" +// @Param limit query integer false "Limit count" default(16) +// @Produce json +// @Success 200 {object} RegionsInfo +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/keyspace/id/{id} [get] +func (h *regionsHandler) GetKeyspaceRegions(w http.ResponseWriter, r *http.Request) { + rc := getCluster(r) + vars := mux.Vars(r) + keyspaceIDStr := vars["id"] + if keyspaceIDStr == "" { + h.rd.JSON(w, http.StatusBadRequest, "keyspace id is empty") + return + } + + keyspaceID64, err := strconv.ParseUint(keyspaceIDStr, 10, 32) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + keyspaceID := uint32(keyspaceID64) + keyspaceManager := h.svr.GetKeyspaceManager() + if _, err := keyspaceManager.LoadKeyspaceByID(keyspaceID); err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + + limit := defaultRegionLimit + if limitStr := r.URL.Query().Get("limit"); limitStr != "" { + limit, err = strconv.Atoi(limitStr) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + } + if limit > maxRegionLimit { + limit = maxRegionLimit + } + regionBound := keyspace.MakeRegionBound(keyspaceID) + regions := rc.ScanRegions(regionBound.RawLeftBound, regionBound.RawRightBound, limit) + if limit <= 0 || limit > len(regions) { + txnRegion := rc.ScanRegions(regionBound.TxnLeftBound, regionBound.TxnRightBound, limit-len(regions)) + regions = append(regions, txnRegion...) + } + regionsInfo := convertToAPIRegions(regions) + h.rd.JSON(w, http.StatusOK, regionsInfo) +} + // @Tags region // @Summary List all regions that miss peer. // @Produce json diff --git a/server/api/router.go b/server/api/router.go index 0fc77ba6b19..2b030237340 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -246,6 +246,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/regions/key", regionsHandler.ScanRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/count", regionsHandler.GetRegionCount, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/store/{id}", regionsHandler.GetStoreRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) + registerFunc(clusterRouter, "/regions/keyspace/id/{id}", regionsHandler.GetKeyspaceRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/writeflow", regionsHandler.GetTopWriteFlowRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/readflow", regionsHandler.GetTopReadFlowRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/confver", regionsHandler.GetTopConfVerRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) diff --git a/server/keyspace_service.go b/server/keyspace_service.go index 8336b988ece..c5fa7377178 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -80,6 +80,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques putFn := func(kv *mvccpb.KeyValue) error { meta := &keyspacepb.KeyspaceMeta{} if err := proto.Unmarshal(kv.Value, meta); err != nil { + defer cancel() // cancel context to stop watcher return err } keyspaces = append(keyspaces, meta) @@ -92,9 +93,14 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques defer func() { keyspaces = keyspaces[:0] }() - return stream.Send(&keyspacepb.WatchKeyspacesResponse{ + err := stream.Send(&keyspacepb.WatchKeyspacesResponse{ Header: s.header(), Keyspaces: keyspaces}) + if err != nil { + defer cancel() // cancel context to stop watcher + return err + } + return nil } watcher := etcdutil.NewLoopWatcher( diff --git a/server/server.go b/server/server.go index 40f5f4d59bd..08d6896a3ef 100644 --- a/server/server.go +++ b/server/server.go @@ -1852,6 +1852,7 @@ func (s *Server) initTSOPrimaryWatcher() { if len(listenUrls) > 0 { // listenUrls[0] is the primary service endpoint of the keyspace group s.servicePrimaryMap.Store(serviceName, listenUrls[0]) + log.Info("update tso primary", zap.String("primary", listenUrls[0])) } return nil } diff --git a/tools/pd-ctl/pdctl/command/region_command.go b/tools/pd-ctl/pdctl/command/region_command.go index fcebb30e6d8..33191bbe12b 100644 --- a/tools/pd-ctl/pdctl/command/region_command.go +++ b/tools/pd-ctl/pdctl/command/region_command.go @@ -45,6 +45,7 @@ var ( regionsKeyPrefix = "pd/api/v1/regions/key" regionsSiblingPrefix = "pd/api/v1/regions/sibling" regionsRangeHolesPrefix = "pd/api/v1/regions/range-holes" + regionsKeyspacePrefix = "pd/api/v1/regions/keyspace" regionIDPrefix = "pd/api/v1/region/id" regionKeyPrefix = "pd/api/v1/region/key" ) @@ -60,6 +61,7 @@ func NewRegionCommand() *cobra.Command { r.AddCommand(NewRegionWithCheckCommand()) r.AddCommand(NewRegionWithSiblingCommand()) r.AddCommand(NewRegionWithStoreCommand()) + r.AddCommand(NewRegionWithKeyspaceCommand()) r.AddCommand(NewRegionsByKeysCommand()) r.AddCommand(NewRangesWithRangeHolesCommand()) @@ -463,6 +465,43 @@ func showRegionWithStoreCommandFunc(cmd *cobra.Command, args []string) { cmd.Println(r) } +// NewRegionWithKeyspaceCommand returns regions with keyspace subcommand of regionCmd +func NewRegionWithKeyspaceCommand() *cobra.Command { + r := &cobra.Command{ + Use: "keyspace ", + Short: "show region information of the given keyspace", + } + r.AddCommand(&cobra.Command{ + Use: "id ", + Short: "show region information for the given keyspace id", + Run: showRegionWithKeyspaceCommandFunc, + }) + return r +} + +func showRegionWithKeyspaceCommandFunc(cmd *cobra.Command, args []string) { + if len(args) < 1 || len(args) > 2 { + cmd.Println(cmd.UsageString()) + return + } + + keyspaceID := args[0] + prefix := regionsKeyspacePrefix + "/id/" + keyspaceID + if len(args) == 2 { + if _, err := strconv.Atoi(args[1]); err != nil { + cmd.Println("limit should be a number") + return + } + prefix += "?limit=" + args[1] + } + r, err := doRequest(cmd, prefix, http.MethodGet, http.Header{}) + if err != nil { + cmd.Printf("Failed to get regions with the given keyspace: %s\n", err) + return + } + cmd.Println(r) +} + const ( rangeHolesLongDesc = `There are some cases that the region range is not continuous, for example, the region doesn't send the heartbeat to PD after a splitting. This command will output all empty ranges without any region info.` From 5f7236fb55e6ba01bbbafbc86749ac922b351154 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 3 Jul 2023 14:53:12 +0800 Subject: [PATCH 8/8] resourcemanager: do not check existence when add resource group (#6717) ref tikv/pd#5851, ref pingcap/tidb#45050 Signed-off-by: glorv Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- errors.toml | 5 ----- pkg/errs/errno.go | 7 +++---- pkg/mcs/resourcemanager/server/manager.go | 8 ++------ .../mcs/resourcemanager/resource_manager_test.go | 12 +++++------- 4 files changed, 10 insertions(+), 22 deletions(-) diff --git a/errors.toml b/errors.toml index ed3cd32d52a..43fc6a582aa 100644 --- a/errors.toml +++ b/errors.toml @@ -606,11 +606,6 @@ error = ''' invalid group settings, please check the group name, priority and the number of resources ''' -["PD:resourcemanager:ErrResourceGroupAlreadyExists"] -error = ''' -the %s resource group already exists -''' - ["PD:schedule:ErrCreateOperator"] error = ''' unable to create operator, %s diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 0fda57fa7c9..0bd2a57dba5 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -372,8 +372,7 @@ var ( // Resource Manager errors var ( - ErrResourceGroupAlreadyExists = errors.Normalize("the %s resource group already exists", errors.RFCCodeText("PD:resourcemanager:ErrResourceGroupAlreadyExists")) - ErrResourceGroupNotExists = errors.Normalize("the %s resource group does not exist", errors.RFCCodeText("PD:resourcemanager:ErrGroupNotExists")) - ErrDeleteReservedGroup = errors.Normalize("cannot delete reserved group", errors.RFCCodeText("PD:resourcemanager:ErrDeleteReservedGroup")) - ErrInvalidGroup = errors.Normalize("invalid group settings, please check the group name, priority and the number of resources", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup")) + ErrResourceGroupNotExists = errors.Normalize("the %s resource group does not exist", errors.RFCCodeText("PD:resourcemanager:ErrGroupNotExists")) + ErrDeleteReservedGroup = errors.Normalize("cannot delete reserved group", errors.RFCCodeText("PD:resourcemanager:ErrDeleteReservedGroup")) + ErrInvalidGroup = errors.Normalize("invalid group settings, please check the group name, priority and the number of resources", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup")) ) diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index a98d274f506..b054a37e0ac 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -154,6 +154,8 @@ func (m *Manager) Init(ctx context.Context) { } // AddResourceGroup puts a resource group. +// NOTE: AddResourceGroup should also be idempotent because tidb depends +// on this retry mechanism. func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error { // Check the name. if len(grouppb.Name) == 0 || len(grouppb.Name) > 32 { @@ -163,12 +165,6 @@ func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error { if grouppb.GetPriority() > 16 { return errs.ErrInvalidGroup } - m.RLock() - _, ok := m.groups[grouppb.Name] - m.RUnlock() - if ok { - return errs.ErrResourceGroupAlreadyExists.FastGenByArgs(grouppb.Name) - } group := FromProtoResourceGroup(grouppb) m.Lock() defer m.Unlock() diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 467eba6c518..5cbed81d57e 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -711,7 +711,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { testCasesSet1 := []struct { name string mode rmpb.GroupMode - addSuccess bool + isNewGroup bool modifySuccess bool expectMarshal string modifySettings func(*rmpb.ResourceGroup) @@ -789,8 +789,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { } // Create Resource Group resp, err := cli.AddResourceGroup(suite.ctx, group) - checkErr(err, tcase.addSuccess) - if tcase.addSuccess { + checkErr(err, true) + if tcase.isNewGroup { finalNum++ re.Contains(resp, "Success!") } @@ -860,11 +860,9 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { resp, err := http.Post(getAddr(i)+"/resource-manager/api/v1/config/group", "application/json", strings.NewReader(string(createJSON))) re.NoError(err) defer resp.Body.Close() - if tcase.addSuccess { - re.Equal(http.StatusOK, resp.StatusCode) + re.Equal(http.StatusOK, resp.StatusCode) + if tcase.isNewGroup { finalNum++ - } else { - re.Equal(http.StatusInternalServerError, resp.StatusCode) } // Modify Resource Group