From 72b0721d10dd27c1735dcc0ba2242dd7d9db047e Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 7 Mar 2024 18:46:37 +0800 Subject: [PATCH] async dispatch Signed-off-by: nolouch --- pkg/mcs/scheduling/server/cluster.go | 2 +- pkg/schedule/operator/operator_controller.go | 33 +++++++++++++++++++- server/cluster/cluster_worker.go | 2 +- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 1b915b6874d2..3f02a1632fdc 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -548,7 +548,7 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { return err } tracer.OnAllStageFinished() - c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) + c.coordinator.GetOperatorController().AsyncDispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) return nil } diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index 07cafb9c566e..45e878f030b0 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -66,11 +66,18 @@ type Controller struct { wop WaitingOperator wopStatus *waitingOperatorStatus opNotifierQueue operatorQueue + dispatchChan chan *CheckTask +} + +type CheckTask struct { + Region *core.RegionInfo + Source string + RecordOpStepWithTTL func(regionID uint64) } // NewController creates a Controller. func NewController(ctx context.Context, cluster *core.BasicCluster, config config.SharedConfigProvider, hbStreams *hbstream.HeartbeatStreams) *Controller { - return &Controller{ + rc := &Controller{ ctx: ctx, cluster: cluster, config: config, @@ -82,7 +89,10 @@ func NewController(ctx context.Context, cluster *core.BasicCluster, config confi wop: newRandBuckets(), wopStatus: newWaitingOperatorStatus(), opNotifierQueue: make(operatorQueue, 0), + dispatchChan: make(chan *CheckTask, 102400), } + go rc.dispatchLoop() + return rc } // Ctx returns a context which will be canceled once RaftCluster is stopped. @@ -103,6 +113,27 @@ func (oc *Controller) GetHBStreams() *hbstream.HeartbeatStreams { return oc.hbStreams } +func (oc *Controller) dispatchLoop() { + for { + select { + case <-oc.ctx.Done(): + return + case task := <-oc.dispatchChan: + oc.Dispatch(task.Region, task.Source, task.RecordOpStepWithTTL) + } + } +} + +func (oc *Controller) AsyncDispatch(region *core.RegionInfo, source string, recordOpStepWithTTL func(regionID uint64)) { + if op := oc.GetOperator(region.GetID()); op != nil { + oc.dispatchChan <- &CheckTask{ + Region: region, + Source: source, + RecordOpStepWithTTL: recordOpStepWithTTL, + } + } +} + // Dispatch is used to dispatch the operator of a region. func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpStepWithTTL func(regionID uint64)) { // Check existed diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 5ae8fdc0396f..5735ec4e69da 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -48,7 +48,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.IsServiceIndependent(mcsutils.SchedulingServiceName) { return nil } - c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) + c.coordinator.GetOperatorController().AsyncDispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) return nil }