Skip to content

Commit

Permalink
copy again to avoid import from pd/pkg in client
Browse files Browse the repository at this point in the history
reset ticker to support dynamic config

Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jun 30, 2023
1 parent e37073f commit 24cd924
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 2 deletions.
43 changes: 43 additions & 0 deletions client/timerpool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133

package timerpool

import (
"sync"
"time"
)

// GlobalTimerPool is a global pool for reusing *time.Timer.
var GlobalTimerPool TimerPool

// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse.
type TimerPool struct {
pool sync.Pool
}

// Get returns a timer with a given duration.
func (tp *TimerPool) Get(d time.Duration) *time.Timer {
if v := tp.pool.Get(); v != nil {
timer := v.(*time.Timer)
timer.Reset(d)
return timer
}
return time.NewTimer(d)
}

// Put tries to call timer.Stop() before putting it back into pool,
// if the timer.Stop() returns false (it has either already expired or been stopped),
// have a shot at draining the channel with residual time if there is one.
func (tp *TimerPool) Put(timer *time.Timer) {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
tp.pool.Put(timer)
}
70 changes: 70 additions & 0 deletions client/timerpool/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133

package timerpool

import (
"testing"
"time"
)

func TestTimerPool(t *testing.T) {
var tp TimerPool

for i := 0; i < 100; i++ {
timer := tp.Get(20 * time.Millisecond)

select {
case <-timer.C:
t.Errorf("timer expired too early")
continue
default:
}

select {
case <-time.After(100 * time.Millisecond):
t.Errorf("timer didn't expire on time")
case <-timer.C:
}

tp.Put(timer)
}
}

const timeout = 10 * time.Millisecond

func BenchmarkTimerUtilization(b *testing.B) {
b.Run("TimerWithPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
t := GlobalTimerPool.Get(timeout)
GlobalTimerPool.Put(t)
}
})
b.Run("TimerWithoutPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
t := time.NewTimer(timeout)
t.Stop()
}
})
}

func BenchmarkTimerPoolParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
t := GlobalTimerPool.Get(timeout)
GlobalTimerPool.Put(t)
}
})
}

func BenchmarkTimerNativeParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
t := time.NewTimer(timeout)
t.Stop()
}
})
}
2 changes: 1 addition & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/timerpool"
"github.com/tikv/pd/client/tsoutil"
"github.com/tikv/pd/pkg/timerpool"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down
2 changes: 2 additions & 0 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (er
if region == nil || !bytes.Equal(region.GetStartKey(), txnRightBound) {
continue
}
// Note: we reset the ticker here to support updating configuration dynamically.
ticker.Reset(manager.config.GetCheckRegionSplitInterval())
case <-timer.C:
log.Warn("[keyspace] wait region split timeout",
zap.Uint32("keyspace-id", id),
Expand Down
5 changes: 4 additions & 1 deletion pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ func (c *Coordinator) PatrolRegions() {
for {
select {
case <-ticker.C:
// Note: we reset the ticker here to support updating configuration dynamically.
ticker.Reset(c.cluster.GetOpts().GetPatrolRegionInterval())
case <-c.ctx.Done():
log.Info("patrol regions has been stopped")
return
Expand Down Expand Up @@ -860,7 +862,8 @@ func (c *Coordinator) runScheduler(s *scheduleController) {
added := c.opController.AddWaitingOperator(op...)
log.Debug("add operator", zap.Int("added", added), zap.Int("total", len(op)), zap.String("scheduler", s.Scheduler.GetName()))
}

// Note: we reset the ticker here to support updating configuration dynamically.
ticker.Reset(s.GetInterval())
case <-s.Ctx().Done():
log.Info("scheduler has been stopped",
zap.String("scheduler-name", s.Scheduler.GetName()),
Expand Down
2 changes: 2 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1680,6 +1680,8 @@ func (s *Server) etcdLeaderLoop() {
select {
case <-ticker.C:
s.member.CheckPriority(ctx)
// Note: we reset the ticker here to support updating configuration dynamically.
ticker.Reset(s.cfg.LeaderPriorityCheckInterval.Duration)
case <-ctx.Done():
log.Info("server is closed, exit etcd leader loop")
return
Expand Down

0 comments on commit 24cd924

Please sign in to comment.