Skip to content

Commit

Permalink
1. reduce unnecessary continue
Browse files Browse the repository at this point in the history
2. replace timer with ticker in for-retry
3. add reset prefunc in lease.go

Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jun 30, 2023
1 parent 5a56675 commit 135659c
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 12 deletions.
2 changes: 0 additions & 2 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,9 @@ func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) {
log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout))
d.cancel()
timerPool.Put(d.timer) // it's safe to put the timer back to the pool
continue
case <-d.done:
d.timer.Stop() // not received from timer.C, so we need to stop the timer
timerPool.Put(d.timer)
continue
case <-ctx.Done():
d.timer.Stop() // not received from timer.C, so we need to stop the timer
timerPool.Put(d.timer)
Expand Down
9 changes: 9 additions & 0 deletions pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ func (l *lease) KeepAlive(ctx context.Context) {
l.expireTime.Store(t)
}
}
// Stop the timer if it's not stopped.
if !timer.Stop() {
select {
case <-timer.C: // try to drain from the channel
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
timer.Reset(l.leaseTimeout)
case <-timer.C:
log.Info("lease timeout", zap.Time("expire", l.expireTime.Load().(time.Time)), zap.String("purpose", l.Purpose))
Expand Down
14 changes: 6 additions & 8 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ func (c *Coordinator) PatrolRegions() {
defer logutil.LogPanic()

defer c.wg.Done()
timer := time.NewTimer(c.cluster.GetOpts().GetPatrolRegionInterval())
defer timer.Stop()
ticker := time.NewTicker(c.cluster.GetOpts().GetPatrolRegionInterval())
defer ticker.Stop()

log.Info("Coordinator starts patrol regions")
start := time.Now()
Expand All @@ -139,8 +139,7 @@ func (c *Coordinator) PatrolRegions() {
)
for {
select {
case <-timer.C:
timer.Reset(c.cluster.GetOpts().GetPatrolRegionInterval())
case <-ticker.C:
case <-c.ctx.Done():
log.Info("patrol regions has been stopped")
return
Expand Down Expand Up @@ -848,12 +847,11 @@ func (c *Coordinator) runScheduler(s *scheduleController) {
defer c.wg.Done()
defer s.Scheduler.Cleanup(c.cluster)

timer := time.NewTimer(s.GetInterval())
defer timer.Stop()
ticker := time.NewTicker(s.GetInterval())
defer ticker.Stop()
for {
select {
case <-timer.C:
timer.Reset(s.GetInterval())
case <-ticker.C:
diagnosable := s.diagnosticRecorder.isAllowed()
if !s.AllowSchedule(diagnosable) {
continue
Expand Down
2 changes: 0 additions & 2 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,9 @@ func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) {
errs.ZapError(errs.ErrProxyTSOTimeout))
d.cancel()
timerPool.Put(d.timer) // it's safe to put the timer back to the pool
continue
case <-d.done:
d.timer.Stop() // not received from timer.C, so we need to stop the timer
timerPool.Put(d.timer)
continue
case <-ctx.Done():
d.timer.Stop() // not received from timer.C, so we need to stop the timer
timerPool.Put(d.timer)
Expand Down

0 comments on commit 135659c

Please sign in to comment.