Skip to content

Commit

Permalink
Merge branch 'master' into sche-redirect6
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] committed Nov 10, 2023
2 parents 8e77510 + f1cee6c commit aae64d7
Show file tree
Hide file tree
Showing 24 changed files with 344 additions and 113 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9
github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.26.0
github.com/sasha-s/go-deadlock v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9 h1:xIeaDUq2ItkYMIgpWXAYKC/N3hs8aurfFvvz79lhHYE=
github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw=
github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 h1:wnHt7ETIB0vm+gbLx8QhcIEmRtrT4QlWlfpcI9vjxOk=
github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
36 changes: 27 additions & 9 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ import (
"go.uber.org/zap"
)

const randomRegionMaxRetry = 10
const (
randomRegionMaxRetry = 10
scanRegionLimit = 1000
)

// errRegionIsStale is error info for region is stale.
func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error {
Expand Down Expand Up @@ -1610,16 +1613,31 @@ func (r *RegionsInfo) ScanRegionWithIterator(startKey []byte, iterator func(regi

// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range.
func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 {
r.t.RLock()
defer r.t.RUnlock()
var size int64
r.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
for {
r.t.RLock()
var cnt int
r.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
}
if cnt >= scanRegionLimit {
return false
}
cnt++
startKey = region.GetEndKey()
size += region.GetApproximateSize()
return true
})
r.t.RUnlock()
if cnt == 0 {
break
}
size += region.GetApproximateSize()
return true
})
if len(startKey) == 0 {
break
}
}

return size
}

Expand Down
120 changes: 120 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"crypto/rand"
"fmt"
"math"
mrand "math/rand"
"strconv"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -658,6 +660,124 @@ func BenchmarkRandomRegion(b *testing.B) {
}
}

func BenchmarkRandomSetRegion(b *testing.B) {
regions := NewRegionsInfo()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
items = append(items, region)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
item.approximateSize = int64(20)
origin, overlaps, rangeChanged := regions.SetRegion(item)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
}
}

func TestGetRegionSizeByRange(t *testing.T) {
regions := NewRegionsInfo()
nums := 1000010
for i := 0; i < nums; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
endKey := []byte(fmt.Sprintf("%20d", i+1))
if i == nums-1 {
endKey = []byte("")
}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: endKey,
}, peer, SetApproximateSize(10))
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
}
totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(""))
require.Equal(t, int64(nums*10), totalSize)
for i := 1; i < 10; i++ {
verifyNum := nums / i
endKey := fmt.Sprintf("%20d", verifyNum)
totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(endKey))
require.Equal(t, int64(verifyNum*10), totalSize)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) {
regions := NewRegionsInfo()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer, SetApproximateSize(10))
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
regions.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
origin, overlaps, rangeChanged := regions.SetRegion(item)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) {
regions := NewRegionsInfo()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
regions.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()

b.RunParallel(
func(pb *testing.PB) {
for pb.Next() {
item := items[mrand.Intn(len(items))]
n := item.Clone(SetApproximateSize(20))
origin, overlaps, rangeChanged := regions.SetRegion(n)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
}
},
)
}

const keyLength = 100

func randomBytes(n int) []byte {
Expand Down
20 changes: 20 additions & 0 deletions pkg/mcs/resourcemanager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,26 @@ func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
}
}

// GetName returns the Name
func (c *Config) GetName() string {
return c.Name
}

// GeBackendEndpoints returns the BackendEndpoints
func (c *Config) GeBackendEndpoints() string {
return c.BackendEndpoints
}

// GetListenAddr returns the ListenAddr
func (c *Config) GetListenAddr() string {
return c.ListenAddr
}

// GetAdvertiseListenAddr returns the AdvertiseListenAddr
func (c *Config) GetAdvertiseListenAddr() string {
return c.AdvertiseListenAddr
}

// GetTLSConfig returns the TLS config.
func (c *Config) GetTLSConfig() *grpcutil.TLSConfig {
return &c.Security.TLSConfig
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,14 @@ func (s *Server) startServer() (err error) {
// different service modes provided by the same pd-server binary
serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix()))

uniqueName := s.cfg.ListenAddr
uniqueName := s.cfg.GetAdvertiseListenAddr()
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
s.participant = member.NewParticipant(s.GetClient(), utils.ResourceManagerServiceName)
p := &resource_manager.Participant{
Name: uniqueName,
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.AdvertiseListenAddr},
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, endpoint.ResourceManagerSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")

Expand All @@ -312,7 +312,7 @@ func (s *Server) startServer() (err error) {
manager: NewManager[*Server](s),
}

if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil {
if err := s.InitListener(s.GetTLSConfig(), s.cfg.GetListenAddr()); err != nil {
return err
}

Expand Down
23 changes: 21 additions & 2 deletions pkg/mcs/resourcemanager/server/token_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/gogo/protobuf/proto"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"go.uber.org/zap"
)

const (
Expand All @@ -31,6 +33,7 @@ const (
defaultReserveRatio = 0.5
defaultLoanCoefficient = 2
maxAssignTokens = math.MaxFloat64 / 1024 // assume max client connect is 1024
slotExpireTimeout = 10 * time.Minute
)

// GroupTokenBucket is a token bucket for a resource group.
Expand Down Expand Up @@ -62,6 +65,7 @@ type TokenSlot struct {
// tokenCapacity is the number of tokens in the slot.
tokenCapacity float64
lastTokenCapacity float64
lastReqTime time.Time
}

// GroupTokenBucketState is the running state of TokenBucket.
Expand All @@ -75,7 +79,8 @@ type GroupTokenBucketState struct {
LastUpdate *time.Time `json:"last_update,omitempty"`
Initialized bool `json:"initialized"`
// settingChanged is used to avoid that the number of tokens returned is jitter because of changing fill rate.
settingChanged bool
settingChanged bool
lastCheckExpireSlot time.Time
}

// Clone returns the copy of GroupTokenBucketState
Expand All @@ -95,6 +100,7 @@ func (gts *GroupTokenBucketState) Clone() *GroupTokenBucketState {
Initialized: gts.Initialized,
tokenSlots: tokenSlots,
clientConsumptionTokensSum: gts.clientConsumptionTokensSum,
lastCheckExpireSlot: gts.lastCheckExpireSlot,
}
}

Expand All @@ -119,16 +125,18 @@ func (gts *GroupTokenBucketState) balanceSlotTokens(
clientUniqueID uint64,
settings *rmpb.TokenLimitSettings,
requiredToken, elapseTokens float64) {
now := time.Now()
slot, exist := gts.tokenSlots[clientUniqueID]
if !exist {
// Only slots that require a positive number will be considered alive,
// but still need to allocate the elapsed tokens as well.
if requiredToken != 0 {
slot = &TokenSlot{}
slot = &TokenSlot{lastReqTime: now}
gts.tokenSlots[clientUniqueID] = slot
gts.clientConsumptionTokensSum = 0
}
} else {
slot.lastReqTime = now
if gts.clientConsumptionTokensSum >= maxAssignTokens {
gts.clientConsumptionTokensSum = 0
}
Expand All @@ -139,6 +147,16 @@ func (gts *GroupTokenBucketState) balanceSlotTokens(
}
}

if time.Since(gts.lastCheckExpireSlot) >= slotExpireTimeout {
gts.lastCheckExpireSlot = now
for clientUniqueID, slot := range gts.tokenSlots {
if time.Since(slot.lastReqTime) >= slotExpireTimeout {
delete(gts.tokenSlots, clientUniqueID)
log.Info("delete resource group slot because expire", zap.Time("last-req-time", slot.lastReqTime),
zap.Any("expire timeout", slotExpireTimeout), zap.Any("del client id", clientUniqueID), zap.Any("len", len(gts.tokenSlots)))
}
}
}
if len(gts.tokenSlots) == 0 {
return
}
Expand Down Expand Up @@ -264,6 +282,7 @@ func (gtb *GroupTokenBucket) init(now time.Time, clientID uint64) {
lastTokenCapacity: gtb.Tokens,
}
gtb.LastUpdate = &now
gtb.lastCheckExpireSlot = now
gtb.Initialized = true
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,26 @@ func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
}
}

// GetName returns the Name
func (c *Config) GetName() string {
return c.Name
}

// GeBackendEndpoints returns the BackendEndpoints
func (c *Config) GeBackendEndpoints() string {
return c.BackendEndpoints
}

// GetListenAddr returns the ListenAddr
func (c *Config) GetListenAddr() string {
return c.ListenAddr
}

// GetAdvertiseListenAddr returns the AdvertiseListenAddr
func (c *Config) GetAdvertiseListenAddr() string {
return c.AdvertiseListenAddr
}

// GetTLSConfig returns the TLS config.
func (c *Config) GetTLSConfig() *grpcutil.TLSConfig {
return &c.Security.TLSConfig
Expand Down
Loading

0 comments on commit aae64d7

Please sign in to comment.