Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Jun 30, 2023
1 parent 4ca73d0 commit 9b0a81a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
8 changes: 4 additions & 4 deletions pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ var (
ExceedStoreLimit CancelReasonType = "exceed store limit"
// ExceedWaitLimit is the cancel reason when the operator exceeds the waiting queue limit.
ExceedWaitLimit CancelReasonType = "exceed wait limit"
// RelatedRegionID is the cancel reason when the operator is related to another region.
RelatedRegionID CancelReasonType = "related region id"
// RelatedMergeRegion is the cancel reason when the operator is cancelled by related merge region.
RelatedMergeRegion CancelReasonType = "related merge region"
// Unknown is the cancel reason when the operator is cancelled by an unknown reason.
Unknown CancelReasonType = "unknown"
)
Expand Down Expand Up @@ -120,8 +120,8 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region
// Sync some attribute with the given timeout.
func (o *Operator) Sync(other *Operator) {
o.timeout = other.timeout
o.AdditionalInfos[string(RelatedRegionID)] = strconv.FormatUint(other.RegionID(), 10)
other.AdditionalInfos[string(RelatedRegionID)] = strconv.FormatUint(o.RegionID(), 10)
o.AdditionalInfos[string(RelatedMergeRegion)] = strconv.FormatUint(other.RegionID(), 10)
other.AdditionalInfos[string(RelatedMergeRegion)] = strconv.FormatUint(o.RegionID(), 10)
}

func (o *Operator) String() string {
Expand Down
15 changes: 10 additions & 5 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS
})
_ = op.Cancel(NotInRunningState)
oc.buryOperator(op)
if op.Kind()&OpMerge != 0 {
oc.removeRelatedOperator(op)
}
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-unexpected").Inc()
oc.PromoteWaitingOperator()
}
Expand Down Expand Up @@ -559,7 +562,7 @@ func (oc *Controller) RemoveOperator(op *Operator, reasons ...CancelReasonType)
}
oc.buryOperator(op)
if op.Kind()&OpMerge != 0 {
oc.removeRelatedOperatorLocked(op)
oc.removeRelatedOperator(op)
}
}
return removed
Expand All @@ -583,15 +586,17 @@ func (oc *Controller) removeOperatorLocked(op *Operator) bool {
return false
}

func (oc *Controller) removeRelatedOperatorLocked(op *Operator) {
relatedID, _ := strconv.ParseUint(op.AdditionalInfos[string(RelatedRegionID)], 10, 64)
func (oc *Controller) removeRelatedOperator(op *Operator) {
oc.Lock()
defer oc.Unlock()
relatedID, _ := strconv.ParseUint(op.AdditionalInfos[string(RelatedMergeRegion)], 10, 64)
if relatedOp := oc.operators[relatedID]; relatedOp != nil && relatedOp.Status() != CANCELED {
log.Info("operator canceled related region",
zap.Uint64("region-id", op.RegionID()),
zap.Uint64("region-id", relatedOp.RegionID()),
zap.String("additional-info", relatedOp.GetAdditionalInfo()),
zap.Duration("takes", relatedOp.RunningTime()))
relatedOp.Cancel(RelatedRegionID)
oc.removeOperatorLocked(relatedOp)
relatedOp.Cancel(RelatedMergeRegion)
oc.buryOperator(relatedOp)
}
}
Expand Down

0 comments on commit 9b0a81a

Please sign in to comment.