From 24cd924d84014a9053eb6f36545da529a0fb9d88 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 30 Jun 2023 15:47:16 +0800 Subject: [PATCH] copy again to avoid import from pd/pkg in client reset ticker to support dynamic config Signed-off-by: lhy1024 --- client/timerpool/pool.go | 43 +++++++++++++++++++++ client/timerpool/pool_test.go | 70 +++++++++++++++++++++++++++++++++++ client/tso_dispatcher.go | 2 +- pkg/keyspace/keyspace.go | 2 + pkg/schedule/coordinator.go | 5 ++- server/server.go | 2 + 6 files changed, 122 insertions(+), 2 deletions(-) create mode 100644 client/timerpool/pool.go create mode 100644 client/timerpool/pool_test.go diff --git a/client/timerpool/pool.go b/client/timerpool/pool.go new file mode 100644 index 00000000000..28ffacfc629 --- /dev/null +++ b/client/timerpool/pool.go @@ -0,0 +1,43 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerpool + +import ( + "sync" + "time" +) + +// GlobalTimerPool is a global pool for reusing *time.Timer. +var GlobalTimerPool TimerPool + +// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse. +type TimerPool struct { + pool sync.Pool +} + +// Get returns a timer with a given duration. +func (tp *TimerPool) Get(d time.Duration) *time.Timer { + if v := tp.pool.Get(); v != nil { + timer := v.(*time.Timer) + timer.Reset(d) + return timer + } + return time.NewTimer(d) +} + +// Put tries to call timer.Stop() before putting it back into pool, +// if the timer.Stop() returns false (it has either already expired or been stopped), +// have a shot at draining the channel with residual time if there is one. +func (tp *TimerPool) Put(timer *time.Timer) { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + tp.pool.Put(timer) +} diff --git a/client/timerpool/pool_test.go b/client/timerpool/pool_test.go new file mode 100644 index 00000000000..d6dffc723a9 --- /dev/null +++ b/client/timerpool/pool_test.go @@ -0,0 +1,70 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerpool + +import ( + "testing" + "time" +) + +func TestTimerPool(t *testing.T) { + var tp TimerPool + + for i := 0; i < 100; i++ { + timer := tp.Get(20 * time.Millisecond) + + select { + case <-timer.C: + t.Errorf("timer expired too early") + continue + default: + } + + select { + case <-time.After(100 * time.Millisecond): + t.Errorf("timer didn't expire on time") + case <-timer.C: + } + + tp.Put(timer) + } +} + +const timeout = 10 * time.Millisecond + +func BenchmarkTimerUtilization(b *testing.B) { + b.Run("TimerWithPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) + b.Run("TimerWithoutPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} + +func BenchmarkTimerPoolParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) +} + +func BenchmarkTimerNativeParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index d29e9004ea6..0200d7deba9 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -27,8 +27,8 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/timerpool" "github.com/tikv/pd/client/tsoutil" - "github.com/tikv/pd/pkg/timerpool" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index adcb0be3106..a763cafd422 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -334,6 +334,8 @@ func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (er if region == nil || !bytes.Equal(region.GetStartKey(), txnRightBound) { continue } + // Note: we reset the ticker here to support updating configuration dynamically. + ticker.Reset(manager.config.GetCheckRegionSplitInterval()) case <-timer.C: log.Warn("[keyspace] wait region split timeout", zap.Uint32("keyspace-id", id), diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index d0dbec3183d..a14a0ff556a 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -140,6 +140,8 @@ func (c *Coordinator) PatrolRegions() { for { select { case <-ticker.C: + // Note: we reset the ticker here to support updating configuration dynamically. + ticker.Reset(c.cluster.GetOpts().GetPatrolRegionInterval()) case <-c.ctx.Done(): log.Info("patrol regions has been stopped") return @@ -860,7 +862,8 @@ func (c *Coordinator) runScheduler(s *scheduleController) { added := c.opController.AddWaitingOperator(op...) log.Debug("add operator", zap.Int("added", added), zap.Int("total", len(op)), zap.String("scheduler", s.Scheduler.GetName())) } - + // Note: we reset the ticker here to support updating configuration dynamically. + ticker.Reset(s.GetInterval()) case <-s.Ctx().Done(): log.Info("scheduler has been stopped", zap.String("scheduler-name", s.Scheduler.GetName()), diff --git a/server/server.go b/server/server.go index 36637c753ae..355dd834f35 100644 --- a/server/server.go +++ b/server/server.go @@ -1680,6 +1680,8 @@ func (s *Server) etcdLeaderLoop() { select { case <-ticker.C: s.member.CheckPriority(ctx) + // Note: we reset the ticker here to support updating configuration dynamically. + ticker.Reset(s.cfg.LeaderPriorityCheckInterval.Duration) case <-ctx.Done(): log.Info("server is closed, exit etcd leader loop") return