From ff67696aa11d8681d6d513fa1e9c35d6fe46049c Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 3 Jul 2023 16:20:13 +0800 Subject: [PATCH] *: fix memory leak introduced by timer.After (#6720) close tikv/pd#6719 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 4 +- client/pd_service_discovery.go | 4 +- client/resource_manager_client.go | 4 +- client/timerpool/pool.go | 43 +++++++++++++++ client/timerpool/pool_test.go | 70 ++++++++++++++++++++++++ client/tso_dispatcher.go | 58 +++++++++++++++----- client/tso_service_discovery.go | 8 ++- client/tso_stream.go | 4 +- pkg/election/lease.go | 14 ++++- pkg/keyspace/keyspace.go | 2 + pkg/mcs/discovery/register.go | 3 + pkg/mcs/resourcemanager/server/server.go | 4 +- pkg/mcs/tso/server/server.go | 4 +- pkg/mcs/utils/util.go | 4 +- pkg/replication/replication_mode.go | 8 ++- pkg/schedule/coordinator.go | 19 ++++--- pkg/tso/keyspace_group_manager.go | 4 +- server/api/pprof.go | 4 +- server/grpc_service.go | 8 ++- server/server.go | 14 ++++- tools/pd-simulator/simulator/client.go | 23 ++++---- 21 files changed, 253 insertions(+), 53 deletions(-) create mode 100644 client/timerpool/pool.go create mode 100644 client/timerpool/pool_test.go diff --git a/client/client.go b/client/client.go index 0597f7344ee..5fbc5f9eb23 100644 --- a/client/client.go +++ b/client/client.go @@ -526,6 +526,8 @@ func newClientWithKeyspaceName( func (c *client) initRetry(f func(s string) error, str string) error { var err error + ticker := time.NewTicker(time.Second) + defer ticker.Stop() for i := 0; i < c.option.maxRetryTimes; i++ { if err = f(str); err == nil { return nil @@ -533,7 +535,7 @@ func (c *client) initRetry(f func(s string) error, str string) error { select { case <-c.ctx.Done(): return err - case <-time.After(time.Second): + case <-ticker.C: } } return errors.WithStack(err) diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 9966b8c75cb..bd7ed31209a 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -206,6 +206,8 @@ func (c *pdServiceDiscovery) Init() error { func (c *pdServiceDiscovery) initRetry(f func() error) error { var err error + ticker := time.NewTicker(time.Second) + defer ticker.Stop() for i := 0; i < c.option.maxRetryTimes; i++ { if err = f(); err == nil { return nil @@ -213,7 +215,7 @@ func (c *pdServiceDiscovery) initRetry(f func() error) error { select { case <-c.ctx.Done(): return err - case <-time.After(time.Second): + case <-ticker.C: } } return errors.WithStack(err) diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 160ccbd0fff..4b9896dfefa 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -389,6 +389,8 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso err error stream rmpb.ResourceManager_AcquireTokenBucketsClient ) + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { cc, err := c.resourceManagerClient() if err != nil { @@ -406,7 +408,7 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso select { case <-ctx.Done(): return err - case <-time.After(retryInterval): + case <-ticker.C: } } return err diff --git a/client/timerpool/pool.go b/client/timerpool/pool.go new file mode 100644 index 00000000000..28ffacfc629 --- /dev/null +++ b/client/timerpool/pool.go @@ -0,0 +1,43 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerpool + +import ( + "sync" + "time" +) + +// GlobalTimerPool is a global pool for reusing *time.Timer. +var GlobalTimerPool TimerPool + +// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse. +type TimerPool struct { + pool sync.Pool +} + +// Get returns a timer with a given duration. +func (tp *TimerPool) Get(d time.Duration) *time.Timer { + if v := tp.pool.Get(); v != nil { + timer := v.(*time.Timer) + timer.Reset(d) + return timer + } + return time.NewTimer(d) +} + +// Put tries to call timer.Stop() before putting it back into pool, +// if the timer.Stop() returns false (it has either already expired or been stopped), +// have a shot at draining the channel with residual time if there is one. +func (tp *TimerPool) Put(timer *time.Timer) { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + tp.pool.Put(timer) +} diff --git a/client/timerpool/pool_test.go b/client/timerpool/pool_test.go new file mode 100644 index 00000000000..d6dffc723a9 --- /dev/null +++ b/client/timerpool/pool_test.go @@ -0,0 +1,70 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerpool + +import ( + "testing" + "time" +) + +func TestTimerPool(t *testing.T) { + var tp TimerPool + + for i := 0; i < 100; i++ { + timer := tp.Get(20 * time.Millisecond) + + select { + case <-timer.C: + t.Errorf("timer expired too early") + continue + default: + } + + select { + case <-time.After(100 * time.Millisecond): + t.Errorf("timer didn't expire on time") + case <-timer.C: + } + + tp.Put(timer) + } +} + +const timeout = 10 * time.Millisecond + +func BenchmarkTimerUtilization(b *testing.B) { + b.Run("TimerWithPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) + b.Run("TimerWithoutPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} + +func BenchmarkTimerPoolParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) +} + +func BenchmarkTimerNativeParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 37bea8db9e5..9fd5c586bf9 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/timerpool" "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" @@ -139,11 +140,24 @@ func (c *tsoClient) updateTSODispatcher() { } type deadline struct { - timer <-chan time.Time + timer *time.Timer done chan struct{} cancel context.CancelFunc } +func newTSDeadline( + timeout time.Duration, + done chan struct{}, + cancel context.CancelFunc, +) *deadline { + timer := timerpool.GlobalTimerPool.Get(timeout) + return &deadline{ + timer: timer, + done: done, + cancel: cancel, + } +} + func (c *tsoClient) tsCancelLoop() { defer c.wg.Done() @@ -172,19 +186,21 @@ func (c *tsoClient) tsCancelLoop() { func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) { if _, exist := c.tsDeadline.Load(dcLocation); !exist { - tsDeadlineCh := make(chan deadline, 1) + tsDeadlineCh := make(chan *deadline, 1) c.tsDeadline.Store(dcLocation, tsDeadlineCh) - go func(dc string, tsDeadlineCh <-chan deadline) { + go func(dc string, tsDeadlineCh <-chan *deadline) { for { select { case d := <-tsDeadlineCh: select { - case <-d.timer: + case <-d.timer.C: log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) d.cancel() + timerpool.GlobalTimerPool.Put(d.timer) case <-d.done: - continue + timerpool.GlobalTimerPool.Put(d.timer) case <-ctx.Done(): + timerpool.GlobalTimerPool.Put(d.timer) return } case <-ctx.Done(): @@ -234,6 +250,8 @@ func (c *tsoClient) checkAllocator( }() cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc) healthCli := healthpb.NewHealthClient(cc) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() for { // the pd/allocator leader change, we need to re-establish the stream if u != url { @@ -259,7 +277,7 @@ func (c *tsoClient) checkAllocator( select { case <-dispatcherCtx.Done(): return - case <-time.After(time.Second): + case <-ticker.C: // To ensure we can get the latest allocator leader // and once the leader is changed, we can exit this function. _, u = c.GetTSOAllocatorClientConnByDCLocation(dc) @@ -366,6 +384,7 @@ func (c *tsoClient) handleDispatcher( // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(c.option.timeout) + defer streamLoopTimer.Stop() tsoBatchLoop: for { select { @@ -389,6 +408,15 @@ tsoBatchLoop: if maxBatchWaitInterval >= 0 { tbc.adjustBestBatchSize() } + // Stop the timer if it's not stopped. + if !streamLoopTimer.Stop() { + select { + case <-streamLoopTimer.C: // try to drain from the channel + default: + } + } + // We need be careful here, see more details in the comments of Timer.Reset. + // https://pkg.go.dev/time@master#Timer.Reset streamLoopTimer.Reset(c.option.timeout) // Choose a stream to send the TSO gRPC request. streamChoosingLoop: @@ -403,16 +431,20 @@ tsoBatchLoop: if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) { continue streamChoosingLoop } + timer := time.NewTimer(retryInterval) select { case <-dispatcherCtx.Done(): + timer.Stop() return case <-streamLoopTimer.C: err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) c.svcDiscovery.ScheduleCheckMemberChanged() c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err)) + timer.Stop() continue tsoBatchLoop - case <-time.After(retryInterval): + case <-timer.C: + timer.Stop() continue streamChoosingLoop } } @@ -429,11 +461,7 @@ tsoBatchLoop: } } done := make(chan struct{}) - dl := deadline{ - timer: time.After(c.option.timeout), - done: done, - cancel: cancel, - } + dl := newTSDeadline(c.option.timeout, done, cancel) tsDeadlineCh, ok := c.tsDeadline.Load(dc) for !ok || tsDeadlineCh == nil { c.scheduleCheckTSDeadline() @@ -443,7 +471,7 @@ tsoBatchLoop: select { case <-dispatcherCtx.Done(): return - case tsDeadlineCh.(chan deadline) <- dl: + case tsDeadlineCh.(chan *deadline) <- dl: } opts = extractSpanReference(tbc, opts[:0]) err = c.processRequests(stream, dc, tbc, opts) @@ -558,6 +586,8 @@ func (c *tsoClient) tryConnectToTSO( } // retry several times before falling back to the follower when the network problem happens + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { c.svcDiscovery.ScheduleCheckMemberChanged() cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) @@ -587,7 +617,7 @@ func (c *tsoClient) tryConnectToTSO( select { case <-dispatcherCtx.Done(): return err - case <-time.After(retryInterval): + case <-ticker.C: } } diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index cee079634e9..6763e80976a 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -209,6 +209,8 @@ func (c *tsoServiceDiscovery) retry( maxRetryTimes int, retryInterval time.Duration, f func() error, ) error { var err error + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { if err = f(); err == nil { return nil @@ -216,7 +218,7 @@ func (c *tsoServiceDiscovery) retry( select { case <-c.ctx.Done(): return err - case <-time.After(retryInterval): + case <-ticker.C: } } return errors.WithStack(err) @@ -245,11 +247,13 @@ func (c *tsoServiceDiscovery) startCheckMemberLoop() { ctx, cancel := context.WithCancel(c.ctx) defer cancel() + ticker := time.NewTicker(memberUpdateInterval) + defer ticker.Stop() for { select { case <-c.checkMembershipCh: - case <-time.After(memberUpdateInterval): + case <-ticker.C: case <-ctx.Done(): log.Info("[tso] exit check member loop") return diff --git a/client/tso_stream.go b/client/tso_stream.go index 892512d8559..e3203818938 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -87,10 +87,12 @@ func (b *tsoTSOStreamBuilder) build( } func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done chan struct{}, timeout time.Duration) { + timer := time.NewTimer(timeout) + defer timer.Stop() select { case <-done: return - case <-time.After(timeout): + case <-timer.C: cancel() case <-ctx.Done(): } diff --git a/pkg/election/lease.go b/pkg/election/lease.go index a0db045256f..1e3e66ddcce 100644 --- a/pkg/election/lease.go +++ b/pkg/election/lease.go @@ -109,6 +109,8 @@ func (l *lease) KeepAlive(ctx context.Context) { timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3) var maxExpire time.Time + timer := time.NewTimer(l.leaseTimeout) + defer timer.Stop() for { select { case t := <-timeCh: @@ -122,7 +124,17 @@ func (l *lease) KeepAlive(ctx context.Context) { l.expireTime.Store(t) } } - case <-time.After(l.leaseTimeout): + // Stop the timer if it's not stopped. + if !timer.Stop() { + select { + case <-timer.C: // try to drain from the channel + default: + } + } + // We need be careful here, see more details in the comments of Timer.Reset. + // https://pkg.go.dev/time@master#Timer.Reset + timer.Reset(l.leaseTimeout) + case <-timer.C: log.Info("lease timeout", zap.Time("expire", l.expireTime.Load().(time.Time)), zap.String("purpose", l.Purpose)) return case <-ctx.Done(): diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index df7eb653828..64dd1ba8622 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -334,6 +334,8 @@ func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (er if region == nil || !bytes.Equal(region.GetStartKey(), txnRightBound) { continue } + // Note: we reset the ticker here to support updating configuration dynamically. + ticker.Reset(manager.config.GetCheckRegionSplitInterval()) case <-timer.C: log.Warn("[keyspace] wait region split timeout", zap.Uint32("keyspace-id", id), diff --git a/pkg/mcs/discovery/register.go b/pkg/mcs/discovery/register.go index 3e08d9b49cf..1e7800295de 100644 --- a/pkg/mcs/discovery/register.go +++ b/pkg/mcs/discovery/register.go @@ -86,6 +86,7 @@ func (sr *ServiceRegister) Register() error { select { case <-sr.ctx.Done(): log.Info("exit register process", zap.String("key", sr.key)) + t.Stop() return default: } @@ -94,11 +95,13 @@ func (sr *ServiceRegister) Register() error { resp, err := sr.cli.Grant(sr.ctx, sr.ttl) if err != nil { log.Error("grant lease failed", zap.String("key", sr.key), zap.Error(err)) + t.Stop() continue } if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil { log.Error("put the key failed", zap.String("key", sr.key), zap.Error(err)) + t.Stop() continue } } diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 6705c4b1da9..a0ccb52ed38 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -286,9 +286,11 @@ func (s *Server) startGRPCServer(l net.Listener) { gs.GracefulStop() close(done) }() + timer := time.NewTimer(utils.DefaultGRPCGracefulStopTimeout) + defer timer.Stop() select { case <-done: - case <-time.After(utils.DefaultGRPCGracefulStopTimeout): + case <-timer.C: log.Info("stopping grpc gracefully is taking longer than expected and force stopping now", zap.Duration("default", utils.DefaultGRPCGracefulStopTimeout)) gs.Stop() } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index a0904f4dc7b..67e9c33f65c 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -595,6 +595,8 @@ func (s *Server) waitAPIServiceReady() error { ready bool err error ) + ticker := time.NewTicker(retryIntervalWaitAPIService) + defer ticker.Stop() for i := 0; i < maxRetryTimesWaitAPIService; i++ { ready, err = s.isAPIServiceReady() if err == nil && ready { @@ -604,7 +606,7 @@ func (s *Server) waitAPIServiceReady() error { select { case <-s.ctx.Done(): return errors.New("context canceled while waiting api server ready") - case <-time.After(retryIntervalWaitAPIService): + case <-ticker.C: } } if err != nil { diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index 1d66fc08c1c..a89c6c72b76 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -37,6 +37,8 @@ const ( // InitClusterID initializes the cluster ID. func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err error) { + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 { return clusterID, nil @@ -44,7 +46,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err select { case <-ctx.Done(): return 0, err - case <-time.After(retryInterval): + case <-ticker.C: } } return 0, errors.Errorf("failed to init cluster ID after retrying %d times", maxRetryTimes) diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index 03ed336fd9f..703dad03d26 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -375,14 +375,18 @@ const ( // Run starts the background job. func (m *ModeManager) Run(ctx context.Context) { // Wait for a while when just start, in case tikv do not connect in time. + timer := time.NewTimer(idleTimeout) + defer timer.Stop() select { - case <-time.After(idleTimeout): + case <-timer.C: case <-ctx.Done(): return } + ticker := time.NewTicker(tickInterval) + defer ticker.Stop() for { select { - case <-time.After(tickInterval): + case <-ticker.C: case <-ctx.Done(): return } diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 419b6a7adae..a14a0ff556a 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -128,8 +128,8 @@ func (c *Coordinator) PatrolRegions() { defer logutil.LogPanic() defer c.wg.Done() - timer := time.NewTimer(c.cluster.GetOpts().GetPatrolRegionInterval()) - defer timer.Stop() + ticker := time.NewTicker(c.cluster.GetOpts().GetPatrolRegionInterval()) + defer ticker.Stop() log.Info("Coordinator starts patrol regions") start := time.Now() @@ -139,8 +139,9 @@ func (c *Coordinator) PatrolRegions() { ) for { select { - case <-timer.C: - timer.Reset(c.cluster.GetOpts().GetPatrolRegionInterval()) + case <-ticker.C: + // Note: we reset the ticker here to support updating configuration dynamically. + ticker.Reset(c.cluster.GetOpts().GetPatrolRegionInterval()) case <-c.ctx.Done(): log.Info("patrol regions has been stopped") return @@ -848,12 +849,11 @@ func (c *Coordinator) runScheduler(s *scheduleController) { defer c.wg.Done() defer s.Scheduler.Cleanup(c.cluster) - timer := time.NewTimer(s.GetInterval()) - defer timer.Stop() + ticker := time.NewTicker(s.GetInterval()) + defer ticker.Stop() for { select { - case <-timer.C: - timer.Reset(s.GetInterval()) + case <-ticker.C: diagnosable := s.diagnosticRecorder.isAllowed() if !s.AllowSchedule(diagnosable) { continue @@ -862,7 +862,8 @@ func (c *Coordinator) runScheduler(s *scheduleController) { added := c.opController.AddWaitingOperator(op...) log.Debug("add operator", zap.Int("added", added), zap.Int("total", len(op)), zap.String("scheduler", s.Scheduler.GetName())) } - + // Note: we reset the ticker here to support updating configuration dynamically. + ticker.Reset(s.GetInterval()) case <-s.Ctx().Done(): log.Info("scheduler has been stopped", zap.String("scheduler-name", s.Scheduler.GetName()), diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 0291bc5863d..8064c54bf39 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -568,6 +568,8 @@ func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() { kgm.primaryPriorityCheckInterval = 200 * time.Millisecond }) + ticker := time.NewTicker(kgm.primaryPriorityCheckInterval) + defer ticker.Stop() ctx, cancel := context.WithCancel(kgm.ctx) defer cancel() groupID := 0 @@ -576,7 +578,7 @@ func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() { case <-ctx.Done(): log.Info("exit primary priority check loop") return - case <-time.After(kgm.primaryPriorityCheckInterval): + case <-ticker.C: // Every primaryPriorityCheckInterval, we only reset the primary of one keyspace group member, kg, localPriority, nextGroupID := kgm.getNextPrimaryToReset(groupID, kgm.tsoServiceID.ServiceAddr) if member != nil { diff --git a/server/api/pprof.go b/server/api/pprof.go index b64278a21b7..900c48f8368 100644 --- a/server/api/pprof.go +++ b/server/api/pprof.go @@ -209,8 +209,10 @@ func (h *pprofHandler) PProfThreadcreate(w http.ResponseWriter, r *http.Request) } func sleepWithCtx(ctx context.Context, d time.Duration) { + timer := time.NewTimer(d) + defer timer.Stop() select { - case <-time.After(d): + case <-timer.C: case <-ctx.Done(): } } diff --git a/server/grpc_service.go b/server/grpc_service.go index f66bd37ed11..83ea0c35dfe 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -960,13 +960,15 @@ func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error { defer logutil.LogPanic() done <- b.stream.SendAndClose(bucket) }() + timer := time.NewTimer(heartbeatSendTimeout) + defer timer.Stop() select { case err := <-done: if err != nil { atomic.StoreInt32(&b.closed, 1) } return err - case <-time.After(heartbeatSendTimeout): + case <-timer.C: atomic.StoreInt32(&b.closed, 1) return ErrSendHeartbeatTimeout } @@ -1000,13 +1002,15 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error { defer logutil.LogPanic() done <- s.stream.Send(m) }() + timer := time.NewTimer(heartbeatSendTimeout) + defer timer.Stop() select { case err := <-done: if err != nil { atomic.StoreInt32(&s.closed, 1) } return errors.WithStack(err) - case <-time.After(heartbeatSendTimeout): + case <-timer.C: atomic.StoreInt32(&s.closed, 1) return ErrSendHeartbeatTimeout } diff --git a/server/server.go b/server/server.go index 08d6896a3ef..ae1b47d9b40 100644 --- a/server/server.go +++ b/server/server.go @@ -617,9 +617,11 @@ func (s *Server) serverMetricsLoop() { ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() + ticker := time.NewTicker(serverMetricsInterval) + defer ticker.Stop() for { select { - case <-time.After(serverMetricsInterval): + case <-ticker.C: s.collectEtcdStateMetrics() case <-ctx.Done(): log.Info("server is closed, exit metrics loop") @@ -1672,10 +1674,14 @@ func (s *Server) etcdLeaderLoop() { ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() + ticker := time.NewTicker(s.cfg.LeaderPriorityCheckInterval.Duration) + defer ticker.Stop() for { select { - case <-time.After(s.cfg.LeaderPriorityCheckInterval.Duration): + case <-ticker.C: s.member.CheckPriority(ctx) + // Note: we reset the ticker here to support updating configuration dynamically. + ticker.Reset(s.cfg.LeaderPriorityCheckInterval.Duration) case <-ctx.Done(): log.Info("server is closed, exit etcd leader loop") return @@ -1815,6 +1821,8 @@ func (s *Server) UnmarkSnapshotRecovering(ctx context.Context) error { // GetServicePrimaryAddr returns the primary address for a given service. // Note: This function will only return primary address without judging if it's alive. func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) { + ticker := time.NewTicker(retryIntervalGetServicePrimary) + defer ticker.Stop() for i := 0; i < maxRetryTimesGetServicePrimary; i++ { if v, ok := s.servicePrimaryMap.Load(serviceName); ok { return v.(string), true @@ -1824,7 +1832,7 @@ func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string) return "", false case <-ctx.Done(): return "", false - case <-time.After(retryIntervalGetServicePrimary): + case <-ticker.C: } } return "", false diff --git a/tools/pd-simulator/simulator/client.go b/tools/pd-simulator/simulator/client.go index 36e0d27869b..8dd1ee1646e 100644 --- a/tools/pd-simulator/simulator/client.go +++ b/tools/pd-simulator/simulator/client.go @@ -149,21 +149,22 @@ func (c *client) createHeartbeatStream() (pdpb.PD_RegionHeartbeatClient, context cancel context.CancelFunc ctx context.Context ) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() for { ctx, cancel = context.WithCancel(c.ctx) stream, err = c.pdClient().RegionHeartbeat(ctx) - if err != nil { - simutil.Logger.Error("create region heartbeat stream error", zap.String("tag", c.tag), zap.Error(err)) - cancel() - select { - case <-time.After(time.Second): - continue - case <-c.ctx.Done(): - simutil.Logger.Info("cancel create stream loop") - return nil, ctx, cancel - } + if err == nil { + break + } + simutil.Logger.Error("create region heartbeat stream error", zap.String("tag", c.tag), zap.Error(err)) + cancel() + select { + case <-c.ctx.Done(): + simutil.Logger.Info("cancel create stream loop") + return nil, ctx, cancel + case <-ticker.C: } - break } return stream, ctx, cancel }