From 27265a13af3a86c1347b027a2f161cbba90b3fa4 Mon Sep 17 00:00:00 2001 From: husharp Date: Fri, 19 Apr 2024 18:46:57 +0800 Subject: [PATCH] make concurrent Signed-off-by: husharp --- pkg/schedule/operator/create_operator.go | 4 +- pkg/schedule/operator/operator.go | 46 ++++++++----------- pkg/schedule/operator/operator_controller.go | 14 +++--- pkg/schedule/operator/status_tracker.go | 33 +++++++++++++ pkg/schedule/operator/status_tracker_test.go | 25 ++++++++++ pkg/schedule/scatter/region_scatterer.go | 4 +- pkg/schedule/scatter/region_scatterer_test.go | 2 +- pkg/schedule/schedulers/balance_leader.go | 4 +- pkg/schedule/schedulers/balance_region.go | 4 +- pkg/schedule/schedulers/balance_witness.go | 4 +- pkg/schedule/schedulers/hot_region.go | 4 +- pkg/schedule/schedulers/split_bucket.go | 2 +- 12 files changed, 98 insertions(+), 48 deletions(-) diff --git a/pkg/schedule/operator/create_operator.go b/pkg/schedule/operator/create_operator.go index 1c96128ab32..71af96b58ba 100644 --- a/pkg/schedule/operator/create_operator.go +++ b/pkg/schedule/operator/create_operator.go @@ -170,8 +170,8 @@ func CreateSplitRegionOperator(desc string, region *core.RegionInfo, kind OpKind brief += fmt.Sprintf(" and keys %v", hexKeys) } op := NewOperator(desc, brief, region.GetID(), region.GetRegionEpoch(), kind|OpSplit, region.GetApproximateSize(), step) - op.AdditionalInfos["region-start-key"] = core.HexRegionKeyStr(logutil.RedactBytes(region.GetStartKey())) - op.AdditionalInfos["region-end-key"] = core.HexRegionKeyStr(logutil.RedactBytes(region.GetEndKey())) + op.SetAdditionalInfo("region-start-key", core.HexRegionKeyStr(logutil.RedactBytes(region.GetStartKey()))) + op.SetAdditionalInfo("region-end-key", core.HexRegionKeyStr(logutil.RedactBytes(region.GetEndKey()))) return op, nil } diff --git a/pkg/schedule/operator/operator.go b/pkg/schedule/operator/operator.go index b87a050969f..de197c4fba4 100644 --- a/pkg/schedule/operator/operator.go +++ b/pkg/schedule/operator/operator.go @@ -15,7 +15,6 @@ package operator import ( - "encoding/json" "fmt" "reflect" "strconv" @@ -83,7 +82,7 @@ type Operator struct { level constant.PriorityLevel Counters []prometheus.Counter FinishedCounters []prometheus.Counter - AdditionalInfos map[string]string + additionalInfos opAdditionalInfo ApproximateSize int64 timeout time.Duration influence *OpInfluence @@ -100,16 +99,18 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region maxDuration += v.Timeout(approximateSize).Seconds() } return &Operator{ - desc: desc, - brief: brief, - regionID: regionID, - regionEpoch: regionEpoch, - kind: kind, - steps: steps, - stepsTime: make([]int64, len(steps)), - status: NewOpStatusTracker(), - level: level, - AdditionalInfos: make(map[string]string), + desc: desc, + brief: brief, + regionID: regionID, + regionEpoch: regionEpoch, + kind: kind, + steps: steps, + stepsTime: make([]int64, len(steps)), + status: NewOpStatusTracker(), + level: level, + additionalInfos: opAdditionalInfo{ + value: make(map[string]string), + }, ApproximateSize: approximateSize, timeout: time.Duration(maxDuration) * time.Second, } @@ -118,8 +119,8 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region // Sync some attribute with the given timeout. func (o *Operator) Sync(other *Operator) { o.timeout = other.timeout - o.AdditionalInfos[string(RelatedMergeRegion)] = strconv.FormatUint(other.RegionID(), 10) - other.AdditionalInfos[string(RelatedMergeRegion)] = strconv.FormatUint(o.RegionID(), 10) + o.SetAdditionalInfo(string(RelatedMergeRegion), strconv.FormatUint(other.RegionID(), 10)) + other.SetAdditionalInfo(string(RelatedMergeRegion), strconv.FormatUint(o.RegionID(), 10)) } func (o *Operator) String() string { @@ -297,8 +298,10 @@ func (o *Operator) CheckSuccess() bool { // Cancel marks the operator canceled. func (o *Operator) Cancel(reason ...CancelReasonType) bool { - if _, ok := o.AdditionalInfos[cancelReason]; !ok && len(reason) != 0 { - o.AdditionalInfos[cancelReason] = string(reason[0]) + o.additionalInfos.Lock() + defer o.additionalInfos.Unlock() + if _, ok := o.additionalInfos.value[cancelReason]; !ok && len(reason) != 0 { + o.additionalInfos.value[cancelReason] = string(reason[0]) } return o.status.To(CANCELED) } @@ -507,17 +510,6 @@ func (o *Operator) Record(finishTime time.Time) *OpRecord { return record } -// GetAdditionalInfo returns additional info with string -func (o *Operator) GetAdditionalInfo() string { - if len(o.AdditionalInfos) != 0 { - additionalInfo, err := json.Marshal(o.AdditionalInfos) - if err == nil { - return string(additionalInfo) - } - } - return "" -} - // IsLeaveJointStateOperator returns true if the desc is OpDescLeaveJointState. func (o *Operator) IsLeaveJointStateOperator() bool { return strings.EqualFold(o.desc, OpDescLeaveJointState) diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index b9294ad970d..a13da7fd349 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -510,7 +510,7 @@ func (oc *Controller) addOperatorInner(op *Operator) bool { log.Info("add operator", zap.Uint64("region-id", regionID), zap.Reflect("operator", op), - zap.String("additional-info", op.GetAdditionalInfo())) + zap.String("additional-info", op.LogAdditionalInfo())) // If there is an old operator, replace it. The priority should be checked // already. @@ -657,7 +657,7 @@ func (oc *Controller) removeOperatorInner(op *Operator) bool { } func (oc *Controller) removeRelatedMergeOperator(op *Operator) { - relatedID, _ := strconv.ParseUint(op.AdditionalInfos[string(RelatedMergeRegion)], 10, 64) + relatedID, _ := strconv.ParseUint(op.GetAdditionalInfo(string(RelatedMergeRegion)), 10, 64) relatedOpi, ok := oc.operators.Load(relatedID) if !ok { return @@ -666,7 +666,7 @@ func (oc *Controller) removeRelatedMergeOperator(op *Operator) { if relatedOp != nil && relatedOp.Status() != CANCELED { log.Info("operator canceled related merge region", zap.Uint64("region-id", relatedOp.RegionID()), - zap.String("additional-info", relatedOp.GetAdditionalInfo()), + zap.String("additional-info", relatedOp.LogAdditionalInfo()), zap.Duration("takes", relatedOp.RunningTime())) oc.removeOperatorInner(relatedOp) relatedOp.Cancel(RelatedMergeRegion) @@ -695,7 +695,7 @@ func (oc *Controller) buryOperator(op *Operator) { zap.Uint64("region-id", op.RegionID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op), - zap.String("additional-info", op.GetAdditionalInfo())) + zap.String("additional-info", op.LogAdditionalInfo())) operatorCounter.WithLabelValues(op.Desc(), "finish").Inc() operatorDuration.WithLabelValues(op.Desc()).Observe(op.RunningTime().Seconds()) for _, counter := range op.FinishedCounters { @@ -706,7 +706,7 @@ func (oc *Controller) buryOperator(op *Operator) { zap.Uint64("region-id", op.RegionID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op), - zap.String("additional-info", op.GetAdditionalInfo())) + zap.String("additional-info", op.LogAdditionalInfo())) operatorCounter.WithLabelValues(op.Desc(), "replace").Inc() case EXPIRED: log.Info("operator expired", @@ -719,14 +719,14 @@ func (oc *Controller) buryOperator(op *Operator) { zap.Uint64("region-id", op.RegionID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op), - zap.String("additional-info", op.GetAdditionalInfo())) + zap.String("additional-info", op.LogAdditionalInfo())) operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc() case CANCELED: log.Info("operator canceled", zap.Uint64("region-id", op.RegionID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op), - zap.String("additional-info", op.GetAdditionalInfo()), + zap.String("additional-info", op.LogAdditionalInfo()), ) operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc() } diff --git a/pkg/schedule/operator/status_tracker.go b/pkg/schedule/operator/status_tracker.go index e103a74ccb3..b5ff4ac74e2 100644 --- a/pkg/schedule/operator/status_tracker.go +++ b/pkg/schedule/operator/status_tracker.go @@ -15,6 +15,7 @@ package operator import ( + "encoding/json" "time" "github.com/tikv/pd/pkg/utils/syncutil" @@ -136,3 +137,35 @@ func (trk *OpStatusTracker) String() string { defer trk.rw.RUnlock() return OpStatusToString(trk.current) } + +type opAdditionalInfo struct { + syncutil.RWMutex + value map[string]string +} + +// SetAdditionalInfo sets additional info with key and value. +func (o *Operator) SetAdditionalInfo(key string, value string) { + o.additionalInfos.Lock() + defer o.additionalInfos.Unlock() + o.additionalInfos.value[key] = value +} + +// GetAdditionalInfo returns additional info with key. +func (o *Operator) GetAdditionalInfo(key string) string { + o.additionalInfos.RLock() + defer o.additionalInfos.RUnlock() + return o.additionalInfos.value[key] +} + +// LogAdditionalInfo returns additional info with string +func (o *Operator) LogAdditionalInfo() string { + o.additionalInfos.RLock() + defer o.additionalInfos.RUnlock() + if len(o.additionalInfos.value) != 0 { + additionalInfo, err := json.Marshal(o.additionalInfos.value) + if err == nil { + return string(additionalInfo) + } + } + return "" +} diff --git a/pkg/schedule/operator/status_tracker_test.go b/pkg/schedule/operator/status_tracker_test.go index e53b017229a..8c897d1e545 100644 --- a/pkg/schedule/operator/status_tracker_test.go +++ b/pkg/schedule/operator/status_tracker_test.go @@ -15,6 +15,8 @@ package operator import ( + "fmt" + "sync" "testing" "time" @@ -178,3 +180,26 @@ func checkReachTime(re *require.Assertions, trk *OpStatusTracker, reached ...OpS re.True(trk.ReachTimeOf(st).IsZero()) } } + +func TestAdditionalInfoConcurrent(t *testing.T) { + op := NewOperator("test", "test", 0, nil, OpAdmin, 0) + + var wg sync.WaitGroup + for i := 0; i < 1000; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + key := fmt.Sprintf("key%d", i) + value := fmt.Sprintf("value%d", i) + op.SetAdditionalInfo(key, value) + if op.GetAdditionalInfo(key) != value { + t.Errorf("unexpected value for key %s", key) + } + }(i) + } + wg.Wait() + + if logInfo := op.LogAdditionalInfo(); logInfo == "" { + t.Error("LogAdditionalInfo returned an empty string") + } +} diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 898c4d052a7..bdec5c98c9c 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -399,8 +399,8 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s if op != nil { scatterSuccessCounter.Inc() r.Put(targetPeers, targetLeader, group) - op.AdditionalInfos["group"] = group - op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10) + op.SetAdditionalInfo("group", group) + op.SetAdditionalInfo("leader-picked-count", strconv.FormatUint(leaderStorePickedCount, 10)) op.SetPriorityLevel(constant.High) } return op, nil diff --git a/pkg/schedule/scatter/region_scatterer_test.go b/pkg/schedule/scatter/region_scatterer_test.go index af41ed04b76..b0027e0e415 100644 --- a/pkg/schedule/scatter/region_scatterer_test.go +++ b/pkg/schedule/scatter/region_scatterer_test.go @@ -679,7 +679,7 @@ func TestSelectedStoresTooFewPeers(t *testing.T) { re.NoError(err) re.False(isPeerCountChanged(op)) if op != nil { - re.Equal(group, op.AdditionalInfos["group"]) + re.Equal(group, op.GetAdditionalInfo("group")) } } } diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index e11e8492765..1b24d42b8d9 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -567,7 +567,7 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan. op.FinishedCounters = append(op.FinishedCounters, balanceDirectionCounter.WithLabelValues(l.GetName(), solver.SourceMetricLabel(), solver.TargetMetricLabel()), ) - op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(solver.sourceScore, 'f', 2, 64) - op.AdditionalInfos["targetScore"] = strconv.FormatFloat(solver.targetScore, 'f', 2, 64) + op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)) + op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64)) return op } diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index 1cef3a4615b..8d024488201 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -278,8 +278,8 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co op.FinishedCounters = append(op.FinishedCounters, balanceDirectionCounter.WithLabelValues(s.GetName(), sourceLabel, targetLabel), ) - op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(solver.sourceScore, 'f', 2, 64) - op.AdditionalInfos["targetScore"] = strconv.FormatFloat(solver.targetScore, 'f', 2, 64) + op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)) + op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64)) return op } diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index aee112c9dc1..07e25582b30 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -378,7 +378,7 @@ func (b *balanceWitnessScheduler) createOperator(solver *solver, collector *plan b.counter.WithLabelValues("move-witness", solver.SourceMetricLabel()+"-out"), b.counter.WithLabelValues("move-witness", solver.TargetMetricLabel()+"-in"), ) - op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(solver.sourceScore, 'f', 2, 64) - op.AdditionalInfos["targetScore"] = strconv.FormatFloat(solver.targetScore, 'f', 2, 64) + op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)) + op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64)) return op } diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 2a38ef399c8..2602cd701f9 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -1661,8 +1661,8 @@ func (bs *balanceSolver) splitBucketsByLoad(region *core.RegionInfo, bucketStats } op := bs.splitBucketsOperator(region, [][]byte{splitKey}) if op != nil { - op.AdditionalInfos["accLoads"] = strconv.FormatUint(acc-stats[splitIdx-1].Loads[dim], 10) - op.AdditionalInfos["totalLoads"] = strconv.FormatUint(totalLoads, 10) + op.SetAdditionalInfo("accLoads", strconv.FormatUint(acc-stats[splitIdx-1].Loads[dim], 10)) + op.SetAdditionalInfo("totalLoads", strconv.FormatUint(totalLoads, 10)) } return op } diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 7e276402e49..96f2f3ce7f0 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -297,7 +297,7 @@ func (s *splitBucketScheduler) splitBucket(plan *splitBucketPlan) []*operator.Op return nil } splitBucketNewOperatorCounter.Inc() - op.AdditionalInfos["hot-degree"] = strconv.FormatInt(int64(splitBucket.HotDegree), 10) + op.SetAdditionalInfo("hot-degree", strconv.FormatInt(int64(splitBucket.HotDegree), 10)) return []*operator.Operator{op} } return nil