Skip to content

Commit

Permalink
async dispatch
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Mar 7, 2024
1 parent 4d9a680 commit 72b0721
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
return err
}
tracer.OnAllStageFinished()
c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL)
c.coordinator.GetOperatorController().AsyncDispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL)
return nil
}

Expand Down
33 changes: 32 additions & 1 deletion pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,18 @@ type Controller struct {
wop WaitingOperator
wopStatus *waitingOperatorStatus
opNotifierQueue operatorQueue
dispatchChan chan *CheckTask
}

type CheckTask struct {
Region *core.RegionInfo
Source string
RecordOpStepWithTTL func(regionID uint64)
}

// NewController creates a Controller.
func NewController(ctx context.Context, cluster *core.BasicCluster, config config.SharedConfigProvider, hbStreams *hbstream.HeartbeatStreams) *Controller {
return &Controller{
rc := &Controller{
ctx: ctx,
cluster: cluster,
config: config,
Expand All @@ -82,7 +89,10 @@ func NewController(ctx context.Context, cluster *core.BasicCluster, config confi
wop: newRandBuckets(),
wopStatus: newWaitingOperatorStatus(),
opNotifierQueue: make(operatorQueue, 0),
dispatchChan: make(chan *CheckTask, 102400),
}
go rc.dispatchLoop()
return rc
}

// Ctx returns a context which will be canceled once RaftCluster is stopped.
Expand All @@ -103,6 +113,27 @@ func (oc *Controller) GetHBStreams() *hbstream.HeartbeatStreams {
return oc.hbStreams
}

func (oc *Controller) dispatchLoop() {
for {
select {
case <-oc.ctx.Done():
return
case task := <-oc.dispatchChan:
oc.Dispatch(task.Region, task.Source, task.RecordOpStepWithTTL)
}
}
}

func (oc *Controller) AsyncDispatch(region *core.RegionInfo, source string, recordOpStepWithTTL func(regionID uint64)) {
if op := oc.GetOperator(region.GetID()); op != nil {
oc.dispatchChan <- &CheckTask{
Region: region,
Source: source,
RecordOpStepWithTTL: recordOpStepWithTTL,
}
}
}

// Dispatch is used to dispatch the operator of a region.
func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpStepWithTTL func(regionID uint64)) {
// Check existed
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
if c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
return nil
}
c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL)
c.coordinator.GetOperatorController().AsyncDispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL)
return nil
}

Expand Down

0 comments on commit 72b0721

Please sign in to comment.