Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2582]Update atomic.Int32 for atomic operations #899

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions pkg/common/test/schedulerapi_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
)

type SchedulerAPIMock struct {
registerCount int32
UpdateAllocationCount int32
UpdateApplicationCount int32
UpdateNodeCount int32
registerCount atomic.Int32
UpdateAllocationCount atomic.Int32
UpdateApplicationCount atomic.Int32
UpdateNodeCount atomic.Int32
registerFn func(request *si.RegisterResourceManagerRequest,
callback api.ResourceManagerCallback) (*si.RegisterResourceManagerResponse, error)
UpdateAllocationFn func(request *si.AllocationRequest) error
Expand All @@ -41,10 +41,6 @@

func NewSchedulerAPIMock() *SchedulerAPIMock {
return &SchedulerAPIMock{
registerCount: int32(0),
UpdateAllocationCount: int32(0),
UpdateApplicationCount: int32(0),
UpdateNodeCount: int32(0),
registerFn: func(request *si.RegisterResourceManagerRequest,
callback api.ResourceManagerCallback) (response *si.RegisterResourceManagerResponse, e error) {
return nil, nil
Expand Down Expand Up @@ -93,28 +89,28 @@
callback api.ResourceManagerCallback) (*si.RegisterResourceManagerResponse, error) {
api.lock.Lock()
defer api.lock.Unlock()
atomic.AddInt32(&api.registerCount, 1)
api.registerCount.Add(1)
return api.registerFn(request, callback)
}

func (api *SchedulerAPIMock) UpdateAllocation(request *si.AllocationRequest) error {
api.lock.Lock()
defer api.lock.Unlock()
atomic.AddInt32(&api.UpdateAllocationCount, 1)
api.UpdateAllocationCount.Add(1)
return api.UpdateAllocationFn(request)
}

func (api *SchedulerAPIMock) UpdateApplication(request *si.ApplicationRequest) error {
api.lock.Lock()
defer api.lock.Unlock()
atomic.AddInt32(&api.UpdateApplicationCount, 1)
api.UpdateApplicationCount.Add(1)
return api.UpdateApplicationFn(request)
}

func (api *SchedulerAPIMock) UpdateNode(request *si.NodeRequest) error {
api.lock.Lock()
defer api.lock.Unlock()
atomic.AddInt32(&api.UpdateNodeCount, 1)
api.UpdateNodeCount.Add(1)
return api.UpdateNodeFn(request)
}

Expand All @@ -125,26 +121,26 @@
}

func (api *SchedulerAPIMock) GetRegisterCount() int32 {
return atomic.LoadInt32(&api.registerCount)
return api.registerCount.Load()

Check warning on line 124 in pkg/common/test/schedulerapi_mock.go

View check run for this annotation

Codecov / codecov/patch

pkg/common/test/schedulerapi_mock.go#L124

Added line #L124 was not covered by tests
}

func (api *SchedulerAPIMock) GetUpdateAllocationCount() int32 {
return atomic.LoadInt32(&api.UpdateAllocationCount)
return api.UpdateAllocationCount.Load()
}

func (api *SchedulerAPIMock) GetUpdateApplicationCount() int32 {
return atomic.LoadInt32(&api.UpdateApplicationCount)
return api.UpdateApplicationCount.Load()

Check warning on line 132 in pkg/common/test/schedulerapi_mock.go

View check run for this annotation

Codecov / codecov/patch

pkg/common/test/schedulerapi_mock.go#L132

Added line #L132 was not covered by tests
}

func (api *SchedulerAPIMock) GetUpdateNodeCount() int32 {
return atomic.LoadInt32(&api.UpdateNodeCount)
return api.UpdateNodeCount.Load()

Check warning on line 136 in pkg/common/test/schedulerapi_mock.go

View check run for this annotation

Codecov / codecov/patch

pkg/common/test/schedulerapi_mock.go#L136

Added line #L136 was not covered by tests
}

func (api *SchedulerAPIMock) ResetAllCounters() {
atomic.StoreInt32(&api.registerCount, 0)
atomic.StoreInt32(&api.UpdateAllocationCount, 0)
atomic.StoreInt32(&api.UpdateApplicationCount, 0)
atomic.StoreInt32(&api.UpdateNodeCount, 0)
api.registerCount.Store(0)
api.UpdateAllocationCount.Store(0)
api.UpdateApplicationCount.Store(0)
api.UpdateNodeCount.Store(0)

Check warning on line 143 in pkg/common/test/schedulerapi_mock.go

View check run for this annotation

Codecov / codecov/patch

pkg/common/test/schedulerapi_mock.go#L140-L143

Added lines #L140 - L143 were not covered by tests
}

func (api *SchedulerAPIMock) Stop() {
Expand Down
9 changes: 4 additions & 5 deletions pkg/dispatcher/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"runtime"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -188,7 +187,7 @@ func TestEventWillNotBeLostWhenEventChannelIsFull(t *testing.T) {
}

// check event channel is full and some events are dispatched asynchronously
assert.Assert(t, atomic.LoadInt32(&asyncDispatchCount) > 0)
assert.Assert(t, asyncDispatchCount.Load() > 0)

// wait until all events are handled
dispatcher.drain()
Expand All @@ -198,7 +197,7 @@ func TestEventWillNotBeLostWhenEventChannelIsFull(t *testing.T) {

// assert all event are handled
assert.Equal(t, recorder.size(), numEvents)
assert.Assert(t, atomic.LoadInt32(&asyncDispatchCount) == 0)
assert.Assert(t, asyncDispatchCount.Load() == 0)

// ensure state is stopped
assert.Equal(t, dispatcher.isRunning(), false)
Expand Down Expand Up @@ -241,7 +240,7 @@ func TestDispatchTimeout(t *testing.T) {
// 2nd one should be added to the channel
// 3rd one should be posted as an async request
time.Sleep(100 * time.Millisecond)
assert.Equal(t, atomic.LoadInt32(&asyncDispatchCount), int32(1))
assert.Equal(t, asyncDispatchCount.Load(), int32(1))

// verify Dispatcher#asyncDispatch is called
buf := make([]byte, 1<<16)
Expand All @@ -250,7 +249,7 @@ func TestDispatchTimeout(t *testing.T) {

// wait until async dispatch routine times out
err := utils.WaitForCondition(func() bool {
return atomic.LoadInt32(&asyncDispatchCount) == int32(0)
return asyncDispatchCount.Load() == int32(0)
}, 100*time.Millisecond, DispatchTimeout+AsyncDispatchCheckInterval)
assert.NilError(t, err)

Expand Down
6 changes: 3 additions & 3 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
AsyncDispatchLimit int32
AsyncDispatchCheckInterval = 3 * time.Second
DispatchTimeout time.Duration
asyncDispatchCount int32 = 0
asyncDispatchCount atomic.Int32 = atomic.Int32{}
)

// central dispatcher that dispatches scheduling events.
Expand Down Expand Up @@ -169,14 +169,14 @@ func (p *Dispatcher) dispatch(event events.SchedulingEvent) error {
// async-dispatch try to enqueue the event in every 3 seconds util timeout,
// it's only called when event channel is full.
func (p *Dispatcher) asyncDispatch(event events.SchedulingEvent) {
count := atomic.AddInt32(&asyncDispatchCount, 1)
count := asyncDispatchCount.Add(1)
log.Log(log.ShimDispatcher).Warn("event channel is full, transition to async-dispatch mode",
zap.Int32("asyncDispatchCount", count))
if count > AsyncDispatchLimit {
panic(fmt.Errorf("dispatcher exceeds async-dispatch limit"))
}
go func(beginTime time.Time, stop chan struct{}) {
defer atomic.AddInt32(&asyncDispatchCount, -1)
defer asyncDispatchCount.Add(-1)
for p.isRunning() {
select {
case <-stop:
Expand Down
Loading