diff --git a/pkg/core/storelimit/store_limit.go b/pkg/core/storelimit/store_limit.go index 8d70b2918a1..e35ec773d80 100644 --- a/pkg/core/storelimit/store_limit.go +++ b/pkg/core/storelimit/store_limit.go @@ -17,6 +17,7 @@ package storelimit import ( "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/ratelimit" + "github.com/tikv/pd/pkg/utils/syncutil" ) const ( @@ -106,7 +107,7 @@ func (l *StoreRateLimit) Rate(typ Type) float64 { if l.limits[typ] == nil { return 0.0 } - return l.limits[typ].ratePerSec + return l.limits[typ].GetRatePerSec() } // Take takes count tokens from the bucket without blocking. @@ -128,12 +129,15 @@ func (l *StoreRateLimit) Reset(rate float64, typ Type) { // limit the operators of a store type limit struct { - limiter *ratelimit.RateLimiter - ratePerSec float64 + limiter *ratelimit.RateLimiter + ratePerSecMutex syncutil.RWMutex + ratePerSec float64 } // Reset resets the rate limit. func (l *limit) Reset(ratePerSec float64) { + l.ratePerSecMutex.Lock() + defer l.ratePerSecMutex.Unlock() if l.ratePerSec == ratePerSec { return } @@ -155,6 +159,8 @@ func (l *limit) Reset(ratePerSec float64) { // Available returns the number of available tokens // It returns true if the rate per second is zero. func (l *limit) Available(n int64) bool { + l.ratePerSecMutex.RLock() + defer l.ratePerSecMutex.RUnlock() if l.ratePerSec == 0 { return true } @@ -164,8 +170,16 @@ func (l *limit) Available(n int64) bool { // Take takes count tokens from the bucket without blocking. func (l *limit) Take(count int64) bool { + l.ratePerSecMutex.RLock() + defer l.ratePerSecMutex.RUnlock() if l.ratePerSec == 0 { return true } return l.limiter.AllowN(int(count)) } + +func (l *limit) GetRatePerSec() float64 { + l.ratePerSecMutex.RLock() + defer l.ratePerSecMutex.RUnlock() + return l.ratePerSec +} diff --git a/pkg/schedule/operator/operator_controller_test.go b/pkg/schedule/operator/operator_controller_test.go index d3c50667fe0..2b16516c4c7 100644 --- a/pkg/schedule/operator/operator_controller_test.go +++ b/pkg/schedule/operator/operator_controller_test.go @@ -955,3 +955,40 @@ func (suite *operatorControllerTestSuite) TestInvalidStoreId() { // Although store 3 does not exist in PD, PD can also send op to TiKV. re.Equal(pdpb.OperatorStatus_RUNNING, oc.GetOperatorStatus(1).Status) } + +func TestConcurrentAddOperatorAndSetStoreLimit(t *testing.T) { + re := require.New(t) + opt := mockconfig.NewTestOptions() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tc := mockcluster.NewCluster(ctx, opt) + stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false /* no need to run */) + oc := NewController(ctx, tc.GetBasicCluster(), tc.GetSharedConfig(), stream) + + regionNum := 1000 + limit := 1600.0 + storeID := uint64(2) + for i := 1; i < 4; i++ { + tc.AddRegionStore(uint64(i), regionNum) + tc.SetStoreLimit(uint64(i), storelimit.AddPeer, limit) + } + for i := 1; i <= regionNum; i++ { + tc.AddLeaderRegion(uint64(i), 1, 3, 4) + } + + // Add operator and set store limit concurrently + var wg sync.WaitGroup + for i := 1; i < 10; i++ { + wg.Add(1) + go func(i uint64) { + defer wg.Done() + for j := 1; j < 10; j++ { + regionID := uint64(j) + i*100 + op := NewTestOperator(regionID, tc.GetRegion(regionID).GetRegionEpoch(), OpRegion, AddPeer{ToStore: storeID, PeerID: regionID}) + re.True(oc.AddOperator(op)) + tc.SetStoreLimit(storeID, storelimit.AddPeer, limit-float64(j)) // every goroutine set a different limit + } + }(uint64(i)) + } + wg.Wait() +}