Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tikv/pd into sche-redirect10
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Nov 10, 2023
2 parents 0452c63 + afe6afc commit 1dc633c
Show file tree
Hide file tree
Showing 60 changed files with 3,103 additions and 1,219 deletions.
30 changes: 30 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ error = '''
init file log error, %s
'''

["PD:mcs:ErrNotFoundSchedulingAddr"]
error = '''
cannot find scheduling address
'''

["PD:mcs:ErrSchedulingServer"]
error = '''
scheduling server meets %v
'''

["PD:member:ErrCheckCampaign"]
error = '''
check campaign failed
Expand Down Expand Up @@ -541,6 +551,11 @@ error = '''
build rule list failed, %s
'''

["PD:placement:ErrKeyFormat"]
error = '''
key should be in hex format, %s
'''

["PD:placement:ErrLoadRule"]
error = '''
load rule failed
Expand All @@ -551,11 +566,21 @@ error = '''
load rule group failed
'''

["PD:placement:ErrPlacementDisabled"]
error = '''
placement rules feature is disabled
'''

["PD:placement:ErrRuleContent"]
error = '''
invalid rule content, %s
'''

["PD:placement:ErrRuleNotFound"]
error = '''
rule not found
'''

["PD:plugin:ErrLoadPlugin"]
error = '''
failed to load plugin
Expand Down Expand Up @@ -606,6 +631,11 @@ error = '''
region %v has abnormal peer
'''

["PD:region:ErrRegionInvalidID"]
error = '''
invalid region id
'''

["PD:region:ErrRegionNotAdjacent"]
error = '''
two regions are not adjacent
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511
github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.26.0
github.com/sasha-s/go-deadlock v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 h1:oyrCfNlAWmLlUfEr+7YTSBo29SP/J1N8hnxBt5yUABo=
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw=
github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 h1:wnHt7ETIB0vm+gbLx8QhcIEmRtrT4QlWlfpcI9vjxOk=
github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
36 changes: 27 additions & 9 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ import (
"go.uber.org/zap"
)

const randomRegionMaxRetry = 10
const (
randomRegionMaxRetry = 10
scanRegionLimit = 1000
)

// errRegionIsStale is error info for region is stale.
func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error {
Expand Down Expand Up @@ -1610,16 +1613,31 @@ func (r *RegionsInfo) ScanRegionWithIterator(startKey []byte, iterator func(regi

// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range.
func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 {
r.t.RLock()
defer r.t.RUnlock()
var size int64
r.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
for {
r.t.RLock()
var cnt int
r.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
}
if cnt >= scanRegionLimit {
return false
}
cnt++
startKey = region.GetEndKey()
size += region.GetApproximateSize()
return true
})
r.t.RUnlock()
if cnt == 0 {
break
}
size += region.GetApproximateSize()
return true
})
if len(startKey) == 0 {
break
}
}

return size
}

Expand Down
120 changes: 120 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"crypto/rand"
"fmt"
"math"
mrand "math/rand"
"strconv"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -658,6 +660,124 @@ func BenchmarkRandomRegion(b *testing.B) {
}
}

func BenchmarkRandomSetRegion(b *testing.B) {
regions := NewRegionsInfo()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
items = append(items, region)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
item.approximateSize = int64(20)
origin, overlaps, rangeChanged := regions.SetRegion(item)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
}
}

func TestGetRegionSizeByRange(t *testing.T) {
regions := NewRegionsInfo()
nums := 1000010
for i := 0; i < nums; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
endKey := []byte(fmt.Sprintf("%20d", i+1))
if i == nums-1 {
endKey = []byte("")
}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: endKey,
}, peer, SetApproximateSize(10))
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
}
totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(""))
require.Equal(t, int64(nums*10), totalSize)
for i := 1; i < 10; i++ {
verifyNum := nums / i
endKey := fmt.Sprintf("%20d", verifyNum)
totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(endKey))
require.Equal(t, int64(verifyNum*10), totalSize)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) {
regions := NewRegionsInfo()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer, SetApproximateSize(10))
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
regions.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
origin, overlaps, rangeChanged := regions.SetRegion(item)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) {
regions := NewRegionsInfo()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
regions.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()

b.RunParallel(
func(pb *testing.PB) {
for pb.Next() {
item := items[mrand.Intn(len(items))]
n := item.Clone(SetApproximateSize(20))
origin, overlaps, rangeChanged := regions.SetRegion(n)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
}
},
)
}

const keyLength = 100

func randomBytes(n int) []byte {
Expand Down
19 changes: 15 additions & 4 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ var (

// region errors
var (
// ErrRegionInvalidID is error info for region id invalid.
ErrRegionInvalidID = errors.Normalize("invalid region id", errors.RFCCodeText("PD:region:ErrRegionInvalidID"))
// ErrRegionNotAdjacent is error info for region not adjacent.
ErrRegionNotAdjacent = errors.Normalize("two regions are not adjacent", errors.RFCCodeText("PD:region:ErrRegionNotAdjacent"))
// ErrRegionNotFound is error info for region not found.
Expand Down Expand Up @@ -153,10 +155,13 @@ var (

// placement errors
var (
ErrRuleContent = errors.Normalize("invalid rule content, %s", errors.RFCCodeText("PD:placement:ErrRuleContent"))
ErrLoadRule = errors.Normalize("load rule failed", errors.RFCCodeText("PD:placement:ErrLoadRule"))
ErrLoadRuleGroup = errors.Normalize("load rule group failed", errors.RFCCodeText("PD:placement:ErrLoadRuleGroup"))
ErrBuildRuleList = errors.Normalize("build rule list failed, %s", errors.RFCCodeText("PD:placement:ErrBuildRuleList"))
ErrRuleContent = errors.Normalize("invalid rule content, %s", errors.RFCCodeText("PD:placement:ErrRuleContent"))
ErrLoadRule = errors.Normalize("load rule failed", errors.RFCCodeText("PD:placement:ErrLoadRule"))
ErrLoadRuleGroup = errors.Normalize("load rule group failed", errors.RFCCodeText("PD:placement:ErrLoadRuleGroup"))
ErrBuildRuleList = errors.Normalize("build rule list failed, %s", errors.RFCCodeText("PD:placement:ErrBuildRuleList"))
ErrPlacementDisabled = errors.Normalize("placement rules feature is disabled", errors.RFCCodeText("PD:placement:ErrPlacementDisabled"))
ErrKeyFormat = errors.Normalize("key should be in hex format, %s", errors.RFCCodeText("PD:placement:ErrKeyFormat"))
ErrRuleNotFound = errors.Normalize("rule not found", errors.RFCCodeText("PD:placement:ErrRuleNotFound"))
)

// region label errors
Expand Down Expand Up @@ -403,3 +408,9 @@ var (
ErrDeleteReservedGroup = errors.Normalize("cannot delete reserved group", errors.RFCCodeText("PD:resourcemanager:ErrDeleteReservedGroup"))
ErrInvalidGroup = errors.Normalize("invalid group settings, please check the group name, priority and the number of resources", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup"))
)

// Micro service errors
var (
ErrNotFoundSchedulingAddr = errors.Normalize("cannot find scheduling address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingAddr"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
)
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ func NewService(srv *rmserver.Service) *Service {
c.Set(multiservicesapi.ServiceContextKey, manager.GetBasicServer())
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
apiHandlerEngine.GET("metrics", utils.PromHandler())
pprof.Register(apiHandlerEngine)
endpoint := apiHandlerEngine.Group(APIPathPrefix)
endpoint.Use(multiservicesapi.ServiceRedirector())
s := &Service{
manager: manager,
apiHandlerEngine: apiHandlerEngine,
Expand Down
20 changes: 20 additions & 0 deletions pkg/mcs/resourcemanager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,26 @@ func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
}
}

// GetName returns the Name
func (c *Config) GetName() string {
return c.Name
}

// GeBackendEndpoints returns the BackendEndpoints
func (c *Config) GeBackendEndpoints() string {
return c.BackendEndpoints
}

// GetListenAddr returns the ListenAddr
func (c *Config) GetListenAddr() string {
return c.ListenAddr
}

// GetAdvertiseListenAddr returns the AdvertiseListenAddr
func (c *Config) GetAdvertiseListenAddr() string {
return c.AdvertiseListenAddr
}

// GetTLSConfig returns the TLS config.
func (c *Config) GetTLSConfig() *grpcutil.TLSConfig {
return &c.Security.TLSConfig
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {
// RequestRU requests the RU of the resource group.
func (rg *ResourceGroup) RequestRU(
now time.Time,
neededTokens float64,
requiredToken float64,
targetPeriodMs, clientUniqueID uint64,
) *rmpb.GrantedRUTokenBucket {
rg.Lock()
Expand All @@ -147,7 +147,7 @@ func (rg *ResourceGroup) RequestRU(
if rg.RUSettings == nil || rg.RUSettings.RU.Settings == nil {
return nil
}
tb, trickleTimeMs := rg.RUSettings.RU.request(now, neededTokens, targetPeriodMs, clientUniqueID)
tb, trickleTimeMs := rg.RUSettings.RU.request(now, requiredToken, targetPeriodMs, clientUniqueID)
return &rmpb.GrantedRUTokenBucket{GrantedTokens: tb, TrickleTimeMs: trickleTimeMs}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,14 @@ func (s *Server) startServer() (err error) {
// different service modes provided by the same pd-server binary
serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix()))

uniqueName := s.cfg.ListenAddr
uniqueName := s.cfg.GetAdvertiseListenAddr()
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
s.participant = member.NewParticipant(s.GetClient(), utils.ResourceManagerServiceName)
p := &resource_manager.Participant{
Name: uniqueName,
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.AdvertiseListenAddr},
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, endpoint.ResourceManagerSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")

Expand All @@ -312,7 +312,7 @@ func (s *Server) startServer() (err error) {
manager: NewManager[*Server](s),
}

if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil {
if err := s.InitListener(s.GetTLSConfig(), s.cfg.GetListenAddr()); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 1dc633c

Please sign in to comment.