Skip to content

Commit

Permalink
use reset for ticker
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Sep 2, 2024
1 parent 571c59b commit 4f047bc
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 29 deletions.
9 changes: 3 additions & 6 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,13 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
defer emergencyTokenAcquisitionTicker.Stop()

failpoint.Inject("fastCleanup", func() {
cleanupTicker.Stop()
cleanupTicker = time.NewTicker(100 * time.Millisecond)
cleanupTicker.Reset(100 * time.Millisecond)
// because of checking `gc.run.consumption` in cleanupTicker,
// so should also change the stateUpdateTicker.
stateUpdateTicker.Stop()
stateUpdateTicker = time.NewTicker(200 * time.Millisecond)
stateUpdateTicker.Reset(200 * time.Millisecond)
})
failpoint.Inject("acceleratedReportingPeriod", func() {
stateUpdateTicker.Stop()
stateUpdateTicker = time.NewTicker(time.Millisecond * 100)
stateUpdateTicker.Reset(time.Millisecond * 100)
})

_, metaRevision, err := c.provider.LoadResourceGroups(ctx)
Expand Down
3 changes: 1 addition & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
defer m.wg.Done()
ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval)
failpoint.Inject("acceleratedAllocNodes", func() {
ticker.Stop()
ticker = time.NewTicker(time.Millisecond * 100)
ticker.Reset(time.Millisecond * 100)
})
defer ticker.Stop()
log.Info("start to alloc nodes to all keyspace groups")
Expand Down
3 changes: 1 addition & 2 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,7 @@ func (m *Manager) GetResourceGroupList(withStats bool) []*ResourceGroup {
func (m *Manager) persistLoop(ctx context.Context) {
ticker := time.NewTicker(time.Minute)
failpoint.Inject("fastPersist", func() {
ticker.Stop()
ticker = time.NewTicker(100 * time.Millisecond)
ticker.Reset(100 * time.Millisecond)
})
defer ticker.Stop()
for {
Expand Down
3 changes: 1 addition & 2 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ func (s *Server) updateAPIServerMemberLoop() {
defer cancel()
ticker := time.NewTicker(memberUpdateInterval)
failpoint.Inject("fastUpdateMember", func() {
ticker.Stop()
ticker = time.NewTicker(100 * time.Millisecond)
ticker.Reset(100 * time.Millisecond)
})
defer ticker.Stop()
var curLeader uint64
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (c *Coordinator) RunUntilStop(collectWaitTime ...time.Duration) {
func (c *Coordinator) Run(collectWaitTime ...time.Duration) {
ticker := time.NewTicker(runSchedulerCheckInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker = time.NewTicker(100 * time.Millisecond)
ticker.Reset(100 * time.Millisecond)
})
defer ticker.Stop()
log.Info("coordinator starts to collect cluster information")
Expand Down
3 changes: 1 addition & 2 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,8 +715,7 @@ func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
}
tsTicker := time.NewTicker(am.updatePhysicalInterval)
failpoint.Inject("fastUpdatePhysicalInterval", func() {
tsTicker.Stop()
tsTicker = time.NewTicker(time.Millisecond)
tsTicker.Reset(time.Millisecond)
})
defer tsTicker.Stop()
checkerTicker := time.NewTicker(PriorityCheck)
Expand Down
9 changes: 3 additions & 6 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,7 @@ func (c *RaftCluster) runServiceCheckJob() {

ticker := time.NewTicker(serviceCheckInterval)
failpoint.Inject("highFrequencyClusterJobs", func() {
ticker.Stop()
ticker = time.NewTicker(time.Millisecond)
ticker.Reset(time.Millisecond)
})
defer ticker.Stop()

Expand Down Expand Up @@ -675,8 +674,7 @@ func (c *RaftCluster) runMetricsCollectionJob() {

ticker := time.NewTicker(metricsCollectionJobInterval)
failpoint.Inject("highFrequencyClusterJobs", func() {
ticker.Stop()
ticker = time.NewTicker(time.Millisecond)
ticker.Reset(time.Millisecond)
})
defer ticker.Stop()

Expand All @@ -699,8 +697,7 @@ func (c *RaftCluster) runNodeStateCheckJob() {

ticker := time.NewTicker(nodeStateCheckJobInterval)
failpoint.Inject("highFrequencyClusterJobs", func() {
ticker.Stop()
ticker = time.NewTicker(2 * time.Second)
ticker.Reset(2 * time.Second)
})
defer ticker.Stop()

Expand Down
3 changes: 1 addition & 2 deletions server/cluster/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ func (sc *schedulingController) runSchedulingMetricsCollectionJob() {

ticker := time.NewTicker(metricsCollectionJobInterval)
failpoint.Inject("highFrequencyClusterJobs", func() {
ticker.Stop()
ticker = time.NewTicker(time.Millisecond)
ticker.Reset(time.Millisecond)
})
defer ticker.Stop()

Expand Down
6 changes: 3 additions & 3 deletions tools/pd-api-bench/cases/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (c *httpController) run() {
for i := int64(0); i < burst; i++ {
go func() {
defer c.wg.Done()
var ticker = time.NewTicker(tt)
ticker := time.NewTicker(tt)
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -293,7 +293,7 @@ func (c *gRPCController) run() {
for i := int64(0); i < burst; i++ {
go func() {
defer c.wg.Done()
var ticker = time.NewTicker(tt)
ticker := time.NewTicker(tt)
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -367,7 +367,7 @@ func (c *etcdController) run() {
for i := int64(0); i < burst; i++ {
go func() {
defer c.wg.Done()
var ticker = time.NewTicker(tt)
ticker := time.NewTicker(tt)
defer ticker.Stop()
for {
select {
Expand Down
6 changes: 3 additions & 3 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func putStores(ctx context.Context, cfg *config.Config, cli pdpb.PDClient, store
log.Fatal("failed to put store", zap.Uint64("store-id", i), zap.String("err", resp.GetHeader().GetError().String()))
}
go func(ctx context.Context, storeID uint64) {
var heartbeatTicker = time.NewTicker(10 * time.Second)
heartbeatTicker := time.NewTicker(10 * time.Second)
defer heartbeatTicker.Stop()
for {
select {
Expand Down Expand Up @@ -526,9 +526,9 @@ func main() {
header := &pdpb.RequestHeader{
ClusterId: clusterID,
}
var heartbeatTicker = time.NewTicker(regionReportInterval * time.Second)
heartbeatTicker := time.NewTicker(regionReportInterval * time.Second)
defer heartbeatTicker.Stop()
var resolvedTSTicker = time.NewTicker(time.Second)
resolvedTSTicker := time.NewTicker(time.Second)
defer resolvedTSTicker.Stop()
withMetric := metrics.InitMetric2Collect(cfg.MetricsAddr)
for {
Expand Down

0 comments on commit 4f047bc

Please sign in to comment.