Skip to content

Commit

Permalink
storelimit: fix datarace from getOrCreateStoreLimit (#8254) (#8258)
Browse files Browse the repository at this point in the history
close #8253

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: lhy1024 <[email protected]>
  • Loading branch information
ti-chi-bot and lhy1024 committed Jun 6, 2024
1 parent cff590e commit d1bf242
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
20 changes: 17 additions & 3 deletions pkg/core/storelimit/store_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package storelimit
import (
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/utils/syncutil"
)

const (
Expand Down Expand Up @@ -106,7 +107,7 @@ func (l *StoreRateLimit) Rate(typ Type) float64 {
if l.limits[typ] == nil {
return 0.0
}
return l.limits[typ].ratePerSec
return l.limits[typ].GetRatePerSec()
}

// Take takes count tokens from the bucket without blocking.
Expand All @@ -128,12 +129,15 @@ func (l *StoreRateLimit) Reset(rate float64, typ Type) {

// limit the operators of a store
type limit struct {
limiter *ratelimit.RateLimiter
ratePerSec float64
limiter *ratelimit.RateLimiter
ratePerSecMutex syncutil.RWMutex
ratePerSec float64
}

// Reset resets the rate limit.
func (l *limit) Reset(ratePerSec float64) {
l.ratePerSecMutex.Lock()
defer l.ratePerSecMutex.Unlock()
if l.ratePerSec == ratePerSec {
return
}
Expand All @@ -155,6 +159,8 @@ func (l *limit) Reset(ratePerSec float64) {
// Available returns the number of available tokens
// It returns true if the rate per second is zero.
func (l *limit) Available(n int64) bool {
l.ratePerSecMutex.RLock()
defer l.ratePerSecMutex.RUnlock()
if l.ratePerSec == 0 {
return true
}
Expand All @@ -164,8 +170,16 @@ func (l *limit) Available(n int64) bool {

// Take takes count tokens from the bucket without blocking.
func (l *limit) Take(count int64) bool {
l.ratePerSecMutex.RLock()
defer l.ratePerSecMutex.RUnlock()
if l.ratePerSec == 0 {
return true
}
return l.limiter.AllowN(int(count))
}

func (l *limit) GetRatePerSec() float64 {
l.ratePerSecMutex.RLock()
defer l.ratePerSecMutex.RUnlock()
return l.ratePerSec
}
37 changes: 37 additions & 0 deletions pkg/schedule/operator/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,3 +955,40 @@ func (suite *operatorControllerTestSuite) TestInvalidStoreId() {
// Although store 3 does not exist in PD, PD can also send op to TiKV.
re.Equal(pdpb.OperatorStatus_RUNNING, oc.GetOperatorStatus(1).Status)
}

func TestConcurrentAddOperatorAndSetStoreLimit(t *testing.T) {
re := require.New(t)
opt := mockconfig.NewTestOptions()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tc := mockcluster.NewCluster(ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false /* no need to run */)
oc := NewController(ctx, tc.GetBasicCluster(), tc.GetSharedConfig(), stream)

regionNum := 1000
limit := 1600.0
storeID := uint64(2)
for i := 1; i < 4; i++ {
tc.AddRegionStore(uint64(i), regionNum)
tc.SetStoreLimit(uint64(i), storelimit.AddPeer, limit)
}
for i := 1; i <= regionNum; i++ {
tc.AddLeaderRegion(uint64(i), 1, 3, 4)
}

// Add operator and set store limit concurrently
var wg sync.WaitGroup
for i := 1; i < 10; i++ {
wg.Add(1)
go func(i uint64) {
defer wg.Done()
for j := 1; j < 10; j++ {
regionID := uint64(j) + i*100
op := NewTestOperator(regionID, tc.GetRegion(regionID).GetRegionEpoch(), OpRegion, AddPeer{ToStore: storeID, PeerID: regionID})
re.True(oc.AddOperator(op))
tc.SetStoreLimit(storeID, storelimit.AddPeer, limit-float64(j)) // every goroutine set a different limit
}
}(uint64(i))
}
wg.Wait()
}

0 comments on commit d1bf242

Please sign in to comment.