Skip to content

Commit

Permalink
method2_plus
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB committed Sep 28, 2023
1 parent de0ebdc commit 96028ec
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 36 deletions.
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/urfave/negroni v0.3.0
go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793
go.uber.org/goleak v1.2.0
go.uber.org/zap v1.25.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
golang.org/x/text v0.9.0
golang.org/x/time v0.1.0
Expand All @@ -75,7 +75,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.12.6 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.6 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch // indirect
Expand Down Expand Up @@ -207,4 +206,6 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch

replace github.com/go-kratos/aegis => github.com/CabinfeverB/aegis v0.0.0-20230915011742-cfc84eb2b73c
replace github.com/go-kratos/aegis => github.com/CabinfeverB/aegis v0.0.0-20230928030530-c7569b8a9aeb

// replace github.com/go-kratos/aegis => /Users/jiangyongbo/github/aegis
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/AlekSi/gocov-xml v1.0.0 h1:4QctJBgXEkbzeKz6PJy6bt3JSPNSN4I2mITYW+eKUo
github.com/AlekSi/gocov-xml v1.0.0/go.mod h1:J0qYeZ6tDg4oZubW9mAAgxlqw39PDfoEkzB3HXSbEuA=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/CabinfeverB/aegis v0.0.0-20230915011742-cfc84eb2b73c h1:CoKbGQWJXZqk4CqPBSeV7te7t3Q1KPKlA9OWc9IVI58=
github.com/CabinfeverB/aegis v0.0.0-20230915011742-cfc84eb2b73c/go.mod h1:Or+Oi/LPw6gLU5Ha3kKb0UyxVFGlHLON0E0zJ3pryXs=
github.com/CabinfeverB/aegis v0.0.0-20230928030530-c7569b8a9aeb h1:8ToNpO0H5HlNE8Lw6+DEDwjoIQlZOgOBltX7heUdrro=
github.com/CabinfeverB/aegis v0.0.0-20230928030530-c7569b8a9aeb/go.mod h1:SGNvWyaAstbIL3bVRiYUFEKvFLCUk/j6sa/zV4X5elk=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
Expand Down Expand Up @@ -61,8 +61,6 @@ github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J
github.com/axw/gocov v1.0.0 h1:YsqYR66hUmilVr23tu8USgnJIJvnwh3n7j5zRn7x4LU=
github.com/axw/gocov v1.0.0/go.mod h1:LvQpEYiwwIb2nYkXY2fDWhg9/AsYqkhmrCshjlUJECE=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -642,8 +640,8 @@ go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c=
go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down
133 changes: 119 additions & 14 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,119 @@ var (
ErrEtcdNotStarted = status.Errorf(codes.Unavailable, "server is started, but etcd not started")
)

const (
limiterPriorityLen = 3
minPenaltyFactor = 0.1
factor = 0.9
)

type priorityManager struct {
limiters []*bbr.BBR
limiterManager *limiterManager

penaltyFactor float64
}

func newPriorityManager(m *limiterManager) *priorityManager {
return &priorityManager{
limiterManager: m,
penaltyFactor: 1.0,
}
}

func (p *priorityManager) tryUpdatePenalty() {
p.penaltyFactor = 1.
for _, l := range p.limiters {
p.penaltyFactor *= l.GetBroadcasePenalty()
if p.penaltyFactor < 0.1 {
p.penaltyFactor = 0.1
}
}
}

func (p *priorityManager) setPenalty(penalty float64) {
for _, limiter := range p.limiters {
limiter.SetPenalty(penalty)
}
}

type limiterManager struct {
storeHeartbeatlimiter *bbr.BBR
getRegionlimiter *bbr.BBR

priorityManagers []*priorityManager
}

func createLimiterManager() *limiterManager {
storeHeartbeatlimiter := bbr.NewLimiter(bbr.WithCPUThreshold(800), bbr.WithName("storeHeartbeat"))
getRegionlimiter := bbr.NewLimiter(bbr.WithCPUThreshold(800), bbr.WithName("getRegion"))
priorityManagers := make([]*priorityManager, limiterPriorityLen)
l := &limiterManager{
storeHeartbeatlimiter: storeHeartbeatlimiter,
getRegionlimiter: getRegionlimiter,
priorityManagers: priorityManagers,
}

for i := range priorityManagers {
priorityManagers[i] = newPriorityManager(l)
}
priorityManagers[0].limiters = append(priorityManagers[0].limiters, storeHeartbeatlimiter)
priorityManagers[1].limiters = append(priorityManagers[1].limiters, getRegionlimiter)

go l.checkPenalty()
go l.printStats()
return l
}

func (l *limiterManager) printStats() {
ticker := time.NewTicker(time.Second * 2)
defer func() {
ticker.Stop()
log.Info("printStats exit")
}()
for range ticker.C {
log.Info("storeHeartbeat limiter stat", zap.Any("Stat", l.storeHeartbeatlimiter.Stat()))
log.Info("gerRegion limiter stat", zap.Any("Stat", l.getRegionlimiter.Stat()))
}
}

func (l *limiterManager) checkPenalty() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
for _, pm := range l.priorityManagers {
pm.tryUpdatePenalty()
}
l.updatePenalty()
}
}

func (l *limiterManager) updatePenalty() {
penaltyFactor := 1.
for _, pm := range l.priorityManagers {
pm.setPenalty(penaltyFactor)
penaltyFactor *= pm.penaltyFactor
}
}

// GrpcServer wraps Server to provide grpc service.
type GrpcServer struct {
*Server
concurrentTSOProxyStreamings atomic.Int32
limiter *bbr.BBR
t time.Time
cnt atomic.Int64
storeHeartbeatLock sync.Mutex

limiterManager *limiterManager

// for test
t time.Time
storeHeartbeatLock sync.Mutex
}

func createGrpcServer(s *Server) *GrpcServer {
return &GrpcServer{
Server: s,
limiterManager: createLimiterManager(),
t: time.Now(),
}
}

type request interface {
Expand Down Expand Up @@ -949,17 +1054,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
if request.GetStats() == nil {
return nil, errors.Errorf("invalid store heartbeat command, but %v", request)
}
s.cnt.Add(1)
done, err2 := s.limiter.Allow()
done, err2 := s.limiterManager.storeHeartbeatlimiter.Allow()
if err2 != nil {
if s.cnt.Load()%10000 == 0 {
log.Info("Stat err", zap.Any("Stat", s.limiter.Stat()))
}
rateLimitCounter.WithLabelValues("StoreHeartbeat").Add(1)
return nil, errs.ErrRateLimitExceeded.FastGenByArgs()
} else {
if s.cnt.Load()%10000 == 0 {
log.Info("Stat success", zap.Any("Stat", s.limiter.Stat()))
}
}
failpoint.Inject("SlowStoreHeartbeat", func(val failpoint.Value) {
s.storeHeartbeatLock.Lock()
Expand Down Expand Up @@ -1348,10 +1446,17 @@ func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionReque
} else if rsp != nil {
return rsp.(*pdpb.GetRegionResponse), nil
}
done, err2 := s.limiter.Allow()
done, err2 := s.limiterManager.getRegionlimiter.Allow()
if err2 != nil {
rateLimitCounter.WithLabelValues("GetRegion").Add(1)
return nil, errs.ErrRateLimitExceeded.FastGenByArgs()
}
failpoint.Inject("SlowStoreHeartbeat", func(val failpoint.Value) {
s.storeHeartbeatLock.Lock()
d := val.(int)
time.Sleep(time.Duration(d) * time.Microsecond * 10)
s.storeHeartbeatLock.Unlock()
})
rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil
Expand Down
9 changes: 9 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ var (
Name: "maxprocs",
Help: "The value of GOMAXPROCS.",
})

rateLimitCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "server",
Name: "rate_limit",
Help: "Counter of region heartbeat.",
}, []string{"api"})
)

func init() {
Expand All @@ -179,4 +187,5 @@ func init() {
prometheus.MustRegister(serviceAuditHistogram)
prometheus.MustRegister(bucketReportInterval)
prometheus.MustRegister(serverMaxProcs)
prometheus.MustRegister(rateLimitCounter)
}
7 changes: 1 addition & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"time"

"github.com/coreos/go-semver/semver"
"github.com/go-kratos/aegis/ratelimit/bbr"
"github.com/gogo/protobuf/proto"
"github.com/gorilla/mux"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -301,11 +300,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le
s.registry.InstallAllRESTHandler(s, etcdCfg.UserHandlers)

etcdCfg.ServiceRegister = func(gs *grpc.Server) {
grpcServer := &GrpcServer{
Server: s,
limiter: bbr.NewLimiter(bbr.WithCPUThreshold(800)),
t: time.Now(),
}
grpcServer := createGrpcServer(s)
pdpb.RegisterPDServer(gs, grpcServer)
keyspacepb.RegisterKeyspaceServer(gs, &KeyspaceServer{GrpcServer: grpcServer})
diagnosticspb.RegisterDiagnosticsServer(gs, s)
Expand Down
2 changes: 1 addition & 1 deletion tools/pd-api-bench/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func newStoreHeartbeat() *storeHeartbeat {
return &storeHeartbeat{
baseCase: &baseCase{
name: "StoreHeartbeat",
qps: 100,
qps: 10000,
burst: 1,
},
}
Expand Down
3 changes: 1 addition & 2 deletions tools/pd-api-bench/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974
github.com/tikv/pd v0.0.0-00010101000000-000000000000
github.com/tikv/pd/client v0.0.0-00010101000000-000000000000
go.uber.org/zap v1.25.0
go.uber.org/zap v1.26.0
google.golang.org/grpc v1.54.0
)

Expand All @@ -26,7 +26,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.18.7 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down
6 changes: 2 additions & 4 deletions tools/pd-api-bench/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.18.7/go.mod h1:JuTnSoeePXmMVe9G8Ncjj
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -409,8 +407,8 @@ go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c=
go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down

0 comments on commit 96028ec

Please sign in to comment.