Skip to content

Commit

Permalink
*: use syncutil lock (tikv#7157)
Browse files Browse the repository at this point in the history
ref tikv#4399

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed Sep 27, 2023
1 parent 985e2a4 commit 8e844d8
Show file tree
Hide file tree
Showing 23 changed files with 67 additions and 51 deletions.
5 changes: 3 additions & 2 deletions pkg/balancer/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
package balancer

import (
"sync"
"sync/atomic"

"github.com/tikv/pd/pkg/utils/syncutil"
)

// RoundRobin is a balancer that selects nodes in a round-robin fashion.
type RoundRobin[T uint32 | string] struct {
sync.RWMutex
syncutil.RWMutex
nodes []T
exists map[T]struct{}
next uint32
Expand Down
5 changes: 3 additions & 2 deletions pkg/btree/btree_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ package btree

import (
"sort"
"sync"

"github.com/tikv/pd/pkg/utils/syncutil"
)

// Item represents a single object in the tree.
Expand All @@ -101,7 +102,7 @@ const (
// FreeList, in particular when they're created with Clone.
// Two Btrees using the same freelist are safe for concurrent write access.
type FreeListG[T Item[T]] struct {
mu sync.Mutex
mu syncutil.Mutex
freelist []*node[T]
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/btree/btree_generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"sort"
"sync"
"testing"

"github.com/tikv/pd/pkg/utils/syncutil"
)

// perm returns a random permutation of n Int items in the range [0, n).
Expand Down Expand Up @@ -752,7 +754,7 @@ func BenchmarkDescendLessOrEqual(b *testing.B) {

const cloneTestSize = 10000

func cloneTestG[T Item[T]](t *testing.T, b *BTreeG[T], start int, p []T, wg *sync.WaitGroup, trees *[]*BTreeG[T], lock *sync.Mutex) {
func cloneTestG[T Item[T]](t *testing.T, b *BTreeG[T], start int, p []T, wg *sync.WaitGroup, trees *[]*BTreeG[T], lock *syncutil.Mutex) {
t.Logf("Starting new clone at %v", start)
lock.Lock()
*trees = append(*trees, b)
Expand All @@ -773,7 +775,7 @@ func TestCloneConcurrentOperationsG(t *testing.T) {
p := perm(cloneTestSize)
var wg sync.WaitGroup
wg.Add(1)
go cloneTestG(t, b, 0, p, &wg, &trees, &sync.Mutex{})
go cloneTestG(t, b, 0, p, &wg, &trees, &syncutil.Mutex{})
wg.Wait()
want := rang(cloneTestSize)
t.Logf("Starting equality checks on %d trees", len(trees))
Expand Down
4 changes: 2 additions & 2 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package election

import (
"context"
"sync"
"sync/atomic"
"time"

Expand All @@ -27,6 +26,7 @@ import (
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand Down Expand Up @@ -61,7 +61,7 @@ type Leadership struct {

keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCancelFuncLock sync.Mutex
keepAliveCancelFuncLock syncutil.Mutex
}

// NewLeadership creates a new Leadership.
Expand Down
3 changes: 2 additions & 1 deletion pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand All @@ -60,7 +61,7 @@ type GroupManager struct {
client *clientv3.Client
clusterID uint64

sync.RWMutex
syncutil.RWMutex
// groups is the cache of keyspace group related information.
// user kind -> keyspace group
groups map[endpoint.UserKind]*indexedHeap
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math"
"sort"
"strings"
"sync"
"time"

"github.com/gogo/protobuf/proto"
Expand All @@ -34,6 +33,7 @@ import (
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/jsonutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

Expand All @@ -49,7 +49,7 @@ const (

// Manager is the manager of resource group.
type Manager struct {
sync.RWMutex
syncutil.RWMutex
srv bs.Server
controllerConfig *ControllerConfig
groups map[string]*ResourceGroup
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ package server

import (
"encoding/json"
"sync"
"time"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

// ResourceGroup is the definition of a resource group, for REST API.
type ResourceGroup struct {
sync.RWMutex
syncutil.RWMutex
Name string `json:"name"`
Mode rmpb.GroupMode `json:"mode"`
// RU settings
Expand Down
10 changes: 5 additions & 5 deletions pkg/memory/meminfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
package memory

import (
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/shirou/gopsutil/v3/mem"
"github.com/tikv/pd/pkg/cgroup"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
"golang.org/x/exp/constraints"
)
Expand Down Expand Up @@ -76,7 +76,7 @@ func MemUsedNormal() (uint64, error) {

type memInfoCache struct {
updateTime time.Time
mu *sync.RWMutex
mu *syncutil.RWMutex
mem uint64
}

Expand Down Expand Up @@ -168,13 +168,13 @@ func init() {
MemUsed = MemUsedNormal
}
memLimit = &memInfoCache{
mu: &sync.RWMutex{},
mu: &syncutil.RWMutex{},
}
memUsage = &memInfoCache{
mu: &sync.RWMutex{},
mu: &syncutil.RWMutex{},
}
serverMemUsage = &memInfoCache{
mu: &sync.RWMutex{},
mu: &syncutil.RWMutex{},
}
_, err := MemTotal()
mustNil(err)
Expand Down
11 changes: 6 additions & 5 deletions pkg/ratelimit/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/syncutil"
"golang.org/x/time/rate"
)

Expand All @@ -33,7 +34,7 @@ func TestUpdateConcurrencyLimiter(t *testing.T) {
label := "test"
status := limiter.Update(label, opts...)
re.True(status&ConcurrencyChanged != 0)
var lock sync.Mutex
var lock syncutil.Mutex
successCount, failedCount := 0, 0
var wg sync.WaitGroup
for i := 0; i < 15; i++ {
Expand Down Expand Up @@ -118,7 +119,7 @@ func TestUpdateQPSLimiter(t *testing.T) {
status := limiter.Update(label, opts...)
re.True(status&QPSChanged != 0)

var lock sync.Mutex
var lock syncutil.Mutex
successCount, failedCount := 0, 0
var wg sync.WaitGroup
wg.Add(3)
Expand Down Expand Up @@ -173,7 +174,7 @@ func TestQPSLimiter(t *testing.T) {
opt(label, limiter)
}

var lock sync.Mutex
var lock syncutil.Mutex
successCount, failedCount := 0, 0
var wg sync.WaitGroup
wg.Add(200)
Expand Down Expand Up @@ -208,7 +209,7 @@ func TestTwoLimiters(t *testing.T) {
opt(label, limiter)
}

var lock sync.Mutex
var lock syncutil.Mutex
successCount, failedCount := 0, 0
var wg sync.WaitGroup
wg.Add(200)
Expand Down Expand Up @@ -245,7 +246,7 @@ func TestTwoLimiters(t *testing.T) {
}

func countRateLimiterHandleResult(limiter *Limiter, label string, successCount *int,
failedCount *int, lock *sync.Mutex, wg *sync.WaitGroup) {
failedCount *int, lock *syncutil.Mutex, wg *sync.WaitGroup) {
result := limiter.Allow(label)
lock.Lock()
defer lock.Unlock()
Expand Down
3 changes: 2 additions & 1 deletion pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

Expand All @@ -39,7 +40,7 @@ var denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues

// Controller is used to manage all schedulers.
type Controller struct {
sync.RWMutex
syncutil.RWMutex
wg sync.WaitGroup
ctx context.Context
cluster sche.SchedulerCluster
Expand Down
6 changes: 3 additions & 3 deletions pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
package statistics

import (
"sync"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/syncutil"
)

// RegionInfoProvider is an interface to provide the region information.
Expand Down Expand Up @@ -85,7 +85,7 @@ type RegionInfoWithTS struct {

// RegionStatistics is used to record the status of regions.
type RegionStatistics struct {
sync.RWMutex
syncutil.RWMutex
rip RegionInfoProvider
conf sc.CheckerConfigProvider
stats map[RegionStatisticType]map[uint64]*RegionInfoWithTS
Expand Down Expand Up @@ -284,7 +284,7 @@ func (r *RegionStatistics) Reset() {

// LabelStatistics is the statistics of the level of labels.
type LabelStatistics struct {
sync.RWMutex
syncutil.RWMutex
regionLabelStats map[uint64]string
labelCounter map[string]int
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
Expand All @@ -62,7 +63,7 @@ const (
)

type state struct {
sync.RWMutex
syncutil.RWMutex
// ams stores the allocator managers of the keyspace groups. Each keyspace group is
// assigned with an allocator manager managing its global/local tso allocators.
// Use a fixed size array to maximize the efficiency of concurrent access to
Expand Down
7 changes: 4 additions & 3 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
Expand Down Expand Up @@ -751,7 +752,7 @@ func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment(
defer mgr.Close()

step := 30
mux := sync.Mutex{}
mux := syncutil.Mutex{}
wg := sync.WaitGroup{}
for i := 0; i < numberOfKeyspaceGroupsToAdd; i += step {
wg.Add(1)
Expand Down Expand Up @@ -872,12 +873,12 @@ func addKeyspaceGroupAssignment(
groupID uint32,
rootPath string,
svcAddrs []string,
priorites []int,
priorities []int,
keyspaces []uint32,
) error {
members := make([]endpoint.KeyspaceGroupMember, len(svcAddrs))
for i, svcAddr := range svcAddrs {
members[i] = endpoint.KeyspaceGroupMember{Address: svcAddr, Priority: priorites[i]}
members[i] = endpoint.KeyspaceGroupMember{Address: svcAddr, Priority: priorities[i]}
}
group := &endpoint.KeyspaceGroup{
ID: groupID,
Expand Down
3 changes: 2 additions & 1 deletion pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver"
Expand Down Expand Up @@ -566,7 +567,7 @@ type LoopWatcher struct {
postEventFn func() error

// forceLoadMu is used to ensure two force loads have minimal interval.
forceLoadMu sync.RWMutex
forceLoadMu syncutil.RWMutex
// lastTimeForceLoad is used to record the last time force loading data from etcd.
lastTimeForceLoad time.Time

Expand Down
Loading

0 comments on commit 8e844d8

Please sign in to comment.