From 1ae3f522c8061d1fb125c88ca4460796be43930a Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 29 Jun 2023 19:30:16 +0800 Subject: [PATCH] *: fix memory leak introduced by timer.After Signed-off-by: lhy1024 --- client/client.go | 4 +- client/pd_service_discovery.go | 4 +- client/resource_manager_client.go | 4 +- client/tso_dispatcher.go | 57 +++++++++++++++++++------- client/tso_service_discovery.go | 8 +++- client/tso_stream.go | 4 +- pkg/election/lease.go | 5 ++- pkg/mcs/tso/server/server.go | 4 +- pkg/mcs/utils/util.go | 4 +- pkg/replication/replication_mode.go | 8 +++- pkg/tso/keyspace_group_manager.go | 4 +- pkg/utils/tsoutil/tso_dispatcher.go | 23 ++++++++--- server/api/pprof.go | 4 +- server/grpc_service.go | 20 ++++++--- server/server.go | 12 ++++-- tools/pd-simulator/simulator/client.go | 23 ++++++----- 16 files changed, 137 insertions(+), 51 deletions(-) diff --git a/client/client.go b/client/client.go index 0597f7344ee..5fbc5f9eb23 100644 --- a/client/client.go +++ b/client/client.go @@ -526,6 +526,8 @@ func newClientWithKeyspaceName( func (c *client) initRetry(f func(s string) error, str string) error { var err error + ticker := time.NewTicker(time.Second) + defer ticker.Stop() for i := 0; i < c.option.maxRetryTimes; i++ { if err = f(str); err == nil { return nil @@ -533,7 +535,7 @@ func (c *client) initRetry(f func(s string) error, str string) error { select { case <-c.ctx.Done(): return err - case <-time.After(time.Second): + case <-ticker.C: } } return errors.WithStack(err) diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 9966b8c75cb..bd7ed31209a 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -206,6 +206,8 @@ func (c *pdServiceDiscovery) Init() error { func (c *pdServiceDiscovery) initRetry(f func() error) error { var err error + ticker := time.NewTicker(time.Second) + defer ticker.Stop() for i := 0; i < c.option.maxRetryTimes; i++ { if err = f(); err == nil { return nil @@ -213,7 +215,7 @@ func (c *pdServiceDiscovery) initRetry(f func() error) error { select { case <-c.ctx.Done(): return err - case <-time.After(time.Second): + case <-ticker.C: } } return errors.WithStack(err) diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 160ccbd0fff..4b9896dfefa 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -389,6 +389,8 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso err error stream rmpb.ResourceManager_AcquireTokenBucketsClient ) + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { cc, err := c.resourceManagerClient() if err != nil { @@ -406,7 +408,7 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso select { case <-ctx.Done(): return err - case <-time.After(retryInterval): + case <-ticker.C: } } return err diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 37bea8db9e5..a41618ee7e6 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -138,12 +138,34 @@ func (c *tsoClient) updateTSODispatcher() { }) } -type deadline struct { - timer <-chan time.Time +var timerPool = sync.Pool{ + New: func() interface{} { + return time.NewTimer(defaultPDTimeout) + }, +} + +// TSDeadline is used to watch the deadline of each tso request. +type TSDeadline struct { + timer *time.Timer done chan struct{} cancel context.CancelFunc } +// NewTSDeadline creates a new TSDeadline. +func NewTSDeadline( + timeout time.Duration, + done chan struct{}, + cancel context.CancelFunc, +) *TSDeadline { + timer := timerPool.Get().(*time.Timer) + timer.Reset(timeout) + return &TSDeadline{ + timer: timer, + done: done, + cancel: cancel, + } +} + func (c *tsoClient) tsCancelLoop() { defer c.wg.Done() @@ -172,17 +194,21 @@ func (c *tsoClient) tsCancelLoop() { func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) { if _, exist := c.tsDeadline.Load(dcLocation); !exist { - tsDeadlineCh := make(chan deadline, 1) + tsDeadlineCh := make(chan *TSDeadline, 1) c.tsDeadline.Store(dcLocation, tsDeadlineCh) - go func(dc string, tsDeadlineCh <-chan deadline) { + go func(dc string, tsDeadlineCh <-chan *TSDeadline) { for { select { case d := <-tsDeadlineCh: select { - case <-d.timer: + case <-d.timer.C: log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) d.cancel() + timerPool.Put(d.timer) // it's safe to put the timer back to the pool + continue case <-d.done: + d.timer.Stop() // not received from timer.C, so we need to stop the timer + timerPool.Put(d.timer) continue case <-ctx.Done(): return @@ -234,6 +260,8 @@ func (c *tsoClient) checkAllocator( }() cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc) healthCli := healthpb.NewHealthClient(cc) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() for { // the pd/allocator leader change, we need to re-establish the stream if u != url { @@ -259,7 +287,7 @@ func (c *tsoClient) checkAllocator( select { case <-dispatcherCtx.Done(): return - case <-time.After(time.Second): + case <-ticker.C: // To ensure we can get the latest allocator leader // and once the leader is changed, we can exit this function. _, u = c.GetTSOAllocatorClientConnByDCLocation(dc) @@ -403,16 +431,19 @@ tsoBatchLoop: if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) { continue streamChoosingLoop } + timer := time.NewTimer(retryInterval) select { case <-dispatcherCtx.Done(): + timer.Stop() return case <-streamLoopTimer.C: err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) c.svcDiscovery.ScheduleCheckMemberChanged() c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err)) + timer.Stop() continue tsoBatchLoop - case <-time.After(retryInterval): + case <-timer.C: continue streamChoosingLoop } } @@ -429,11 +460,7 @@ tsoBatchLoop: } } done := make(chan struct{}) - dl := deadline{ - timer: time.After(c.option.timeout), - done: done, - cancel: cancel, - } + dl := NewTSDeadline(c.option.timeout, done, cancel) tsDeadlineCh, ok := c.tsDeadline.Load(dc) for !ok || tsDeadlineCh == nil { c.scheduleCheckTSDeadline() @@ -443,7 +470,7 @@ tsoBatchLoop: select { case <-dispatcherCtx.Done(): return - case tsDeadlineCh.(chan deadline) <- dl: + case tsDeadlineCh.(chan *TSDeadline) <- dl: } opts = extractSpanReference(tbc, opts[:0]) err = c.processRequests(stream, dc, tbc, opts) @@ -558,6 +585,8 @@ func (c *tsoClient) tryConnectToTSO( } // retry several times before falling back to the follower when the network problem happens + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { c.svcDiscovery.ScheduleCheckMemberChanged() cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) @@ -587,7 +616,7 @@ func (c *tsoClient) tryConnectToTSO( select { case <-dispatcherCtx.Done(): return err - case <-time.After(retryInterval): + case <-ticker.C: } } diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index cee079634e9..6763e80976a 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -209,6 +209,8 @@ func (c *tsoServiceDiscovery) retry( maxRetryTimes int, retryInterval time.Duration, f func() error, ) error { var err error + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { if err = f(); err == nil { return nil @@ -216,7 +218,7 @@ func (c *tsoServiceDiscovery) retry( select { case <-c.ctx.Done(): return err - case <-time.After(retryInterval): + case <-ticker.C: } } return errors.WithStack(err) @@ -245,11 +247,13 @@ func (c *tsoServiceDiscovery) startCheckMemberLoop() { ctx, cancel := context.WithCancel(c.ctx) defer cancel() + ticker := time.NewTicker(memberUpdateInterval) + defer ticker.Stop() for { select { case <-c.checkMembershipCh: - case <-time.After(memberUpdateInterval): + case <-ticker.C: case <-ctx.Done(): log.Info("[tso] exit check member loop") return diff --git a/client/tso_stream.go b/client/tso_stream.go index 892512d8559..e3203818938 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -87,10 +87,12 @@ func (b *tsoTSOStreamBuilder) build( } func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done chan struct{}, timeout time.Duration) { + timer := time.NewTimer(timeout) + defer timer.Stop() select { case <-done: return - case <-time.After(timeout): + case <-timer.C: cancel() case <-ctx.Done(): } diff --git a/pkg/election/lease.go b/pkg/election/lease.go index a0db045256f..aef9755b98d 100644 --- a/pkg/election/lease.go +++ b/pkg/election/lease.go @@ -109,6 +109,8 @@ func (l *lease) KeepAlive(ctx context.Context) { timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3) var maxExpire time.Time + timer := time.NewTimer(l.leaseTimeout) + defer timer.Stop() for { select { case t := <-timeCh: @@ -122,7 +124,8 @@ func (l *lease) KeepAlive(ctx context.Context) { l.expireTime.Store(t) } } - case <-time.After(l.leaseTimeout): + timer.Reset(l.leaseTimeout) + case <-timer.C: log.Info("lease timeout", zap.Time("expire", l.expireTime.Load().(time.Time)), zap.String("purpose", l.Purpose)) return case <-ctx.Done(): diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index a0904f4dc7b..67e9c33f65c 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -595,6 +595,8 @@ func (s *Server) waitAPIServiceReady() error { ready bool err error ) + ticker := time.NewTicker(retryIntervalWaitAPIService) + defer ticker.Stop() for i := 0; i < maxRetryTimesWaitAPIService; i++ { ready, err = s.isAPIServiceReady() if err == nil && ready { @@ -604,7 +606,7 @@ func (s *Server) waitAPIServiceReady() error { select { case <-s.ctx.Done(): return errors.New("context canceled while waiting api server ready") - case <-time.After(retryIntervalWaitAPIService): + case <-ticker.C: } } if err != nil { diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index 1d66fc08c1c..a89c6c72b76 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -37,6 +37,8 @@ const ( // InitClusterID initializes the cluster ID. func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err error) { + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 { return clusterID, nil @@ -44,7 +46,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err select { case <-ctx.Done(): return 0, err - case <-time.After(retryInterval): + case <-ticker.C: } } return 0, errors.Errorf("failed to init cluster ID after retrying %d times", maxRetryTimes) diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index 03ed336fd9f..703dad03d26 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -375,14 +375,18 @@ const ( // Run starts the background job. func (m *ModeManager) Run(ctx context.Context) { // Wait for a while when just start, in case tikv do not connect in time. + timer := time.NewTimer(idleTimeout) + defer timer.Stop() select { - case <-time.After(idleTimeout): + case <-timer.C: case <-ctx.Done(): return } + ticker := time.NewTicker(tickInterval) + defer ticker.Stop() for { select { - case <-time.After(tickInterval): + case <-ticker.C: case <-ctx.Done(): return } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index a82376430fa..d76f1cf59e7 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -535,6 +535,8 @@ func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() { kgm.primaryPriorityCheckInterval = 200 * time.Millisecond }) + ticker := time.NewTicker(kgm.primaryPriorityCheckInterval) + defer ticker.Stop() ctx, cancel := context.WithCancel(kgm.ctx) defer cancel() groupID := 0 @@ -543,7 +545,7 @@ func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() { case <-ctx.Done(): log.Info("exit primary priority check loop") return - case <-time.After(kgm.primaryPriorityCheckInterval): + case <-ticker.C: // Every primaryPriorityCheckInterval, we only reset the primary of one keyspace group member, kg, localPriority, nextGroupID := kgm.getNextPrimaryToReset(groupID, kgm.tsoServiceID.ServiceAddr) if member != nil { diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 69baf4b1e41..2679226124b 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -195,9 +195,15 @@ func (s *TSODispatcher) finishRequest(requests []Request, physical, firstLogical return nil } +var timerPool = sync.Pool{ + New: func() interface{} { + return time.NewTimer(DefaultTSOProxyTimeout) + }, +} + // TSDeadline is used to watch the deadline of each tso request. type TSDeadline struct { - timer <-chan time.Time + timer *time.Timer done chan struct{} cancel context.CancelFunc } @@ -208,8 +214,10 @@ func NewTSDeadline( done chan struct{}, cancel context.CancelFunc, ) *TSDeadline { + timer := timerPool.Get().(*time.Timer) + timer.Reset(timeout) return &TSDeadline{ - timer: time.After(timeout), + timer: timer, done: done, cancel: cancel, } @@ -224,11 +232,15 @@ func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) { select { case d := <-tsDeadlineCh: select { - case <-d.timer: + case <-d.timer.C: log.Error("tso proxy request processing is canceled due to timeout", errs.ZapError(errs.ErrProxyTSOTimeout)) d.cancel() + timerPool.Put(d.timer) // it's safe to put the timer back to the pool + continue case <-d.done: + d.timer.Stop() // not received from timer.C, so we need to stop the timer + timerPool.Put(d.timer) continue case <-ctx.Done(): return @@ -241,11 +253,12 @@ func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) { func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) { defer logutil.LogPanic() - + timer := time.NewTimer(3 * time.Second) + defer timer.Stop() select { case <-done: return - case <-time.After(3 * time.Second): + case <-timer.C: cancel() case <-streamCtx.Done(): } diff --git a/server/api/pprof.go b/server/api/pprof.go index b64278a21b7..900c48f8368 100644 --- a/server/api/pprof.go +++ b/server/api/pprof.go @@ -209,8 +209,10 @@ func (h *pprofHandler) PProfThreadcreate(w http.ResponseWriter, r *http.Request) } func sleepWithCtx(ctx context.Context, d time.Duration) { + timer := time.NewTimer(d) + defer timer.Stop() select { - case <-time.After(d): + case <-timer.C: case <-ctx.Done(): } } diff --git a/server/grpc_service.go b/server/grpc_service.go index 1badabb19d8..83ea0c35dfe 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -606,13 +606,15 @@ func (s *tsoServer) Send(m *pdpb.TsoResponse) error { }) done <- s.stream.Send(m) }() + timer := time.NewTimer(tsoutil.DefaultTSOProxyTimeout) + defer timer.Stop() select { case err := <-done: if err != nil { atomic.StoreInt32(&s.closed, 1) } return errors.WithStack(err) - case <-time.After(tsoutil.DefaultTSOProxyTimeout): + case <-timer.C: atomic.StoreInt32(&s.closed, 1) return ErrForwardTSOTimeout } @@ -633,6 +635,8 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) { request, err := s.stream.Recv() requestCh <- &pdpbTSORequest{request: request, err: err} }() + timer := time.NewTimer(timeout) + defer timer.Stop() select { case req := <-requestCh: if req.err != nil { @@ -640,7 +644,7 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) { return nil, errors.WithStack(req.err) } return req.request, nil - case <-time.After(timeout): + case <-timer.C: atomic.StoreInt32(&s.closed, 1) return nil, ErrTSOProxyRecvFromClientTimeout } @@ -956,13 +960,15 @@ func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error { defer logutil.LogPanic() done <- b.stream.SendAndClose(bucket) }() + timer := time.NewTimer(heartbeatSendTimeout) + defer timer.Stop() select { case err := <-done: if err != nil { atomic.StoreInt32(&b.closed, 1) } return err - case <-time.After(heartbeatSendTimeout): + case <-timer.C: atomic.StoreInt32(&b.closed, 1) return ErrSendHeartbeatTimeout } @@ -996,13 +1002,15 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error { defer logutil.LogPanic() done <- s.stream.Send(m) }() + timer := time.NewTimer(heartbeatSendTimeout) + defer timer.Stop() select { case err := <-done: if err != nil { atomic.StoreInt32(&s.closed, 1) } return errors.WithStack(err) - case <-time.After(heartbeatSendTimeout): + case <-timer.C: atomic.StoreInt32(&s.closed, 1) return ErrSendHeartbeatTimeout } @@ -2173,10 +2181,12 @@ func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient // TODO: If goroutine here timeout when tso stream created successfully, we need to handle it correctly. func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) { defer logutil.LogPanic() + timer := time.NewTimer(3 * time.Second) + defer timer.Stop() select { case <-done: return - case <-time.After(3 * time.Second): + case <-timer.C: cancel() case <-streamCtx.Done(): } diff --git a/server/server.go b/server/server.go index 40f5f4d59bd..36637c753ae 100644 --- a/server/server.go +++ b/server/server.go @@ -617,9 +617,11 @@ func (s *Server) serverMetricsLoop() { ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() + ticker := time.NewTicker(serverMetricsInterval) + defer ticker.Stop() for { select { - case <-time.After(serverMetricsInterval): + case <-ticker.C: s.collectEtcdStateMetrics() case <-ctx.Done(): log.Info("server is closed, exit metrics loop") @@ -1672,9 +1674,11 @@ func (s *Server) etcdLeaderLoop() { ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() + ticker := time.NewTicker(s.cfg.LeaderPriorityCheckInterval.Duration) + defer ticker.Stop() for { select { - case <-time.After(s.cfg.LeaderPriorityCheckInterval.Duration): + case <-ticker.C: s.member.CheckPriority(ctx) case <-ctx.Done(): log.Info("server is closed, exit etcd leader loop") @@ -1815,6 +1819,8 @@ func (s *Server) UnmarkSnapshotRecovering(ctx context.Context) error { // GetServicePrimaryAddr returns the primary address for a given service. // Note: This function will only return primary address without judging if it's alive. func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) { + ticker := time.NewTicker(retryIntervalGetServicePrimary) + defer ticker.Stop() for i := 0; i < maxRetryTimesGetServicePrimary; i++ { if v, ok := s.servicePrimaryMap.Load(serviceName); ok { return v.(string), true @@ -1824,7 +1830,7 @@ func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string) return "", false case <-ctx.Done(): return "", false - case <-time.After(retryIntervalGetServicePrimary): + case <-ticker.C: } } return "", false diff --git a/tools/pd-simulator/simulator/client.go b/tools/pd-simulator/simulator/client.go index 36e0d27869b..8dd1ee1646e 100644 --- a/tools/pd-simulator/simulator/client.go +++ b/tools/pd-simulator/simulator/client.go @@ -149,21 +149,22 @@ func (c *client) createHeartbeatStream() (pdpb.PD_RegionHeartbeatClient, context cancel context.CancelFunc ctx context.Context ) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() for { ctx, cancel = context.WithCancel(c.ctx) stream, err = c.pdClient().RegionHeartbeat(ctx) - if err != nil { - simutil.Logger.Error("create region heartbeat stream error", zap.String("tag", c.tag), zap.Error(err)) - cancel() - select { - case <-time.After(time.Second): - continue - case <-c.ctx.Done(): - simutil.Logger.Info("cancel create stream loop") - return nil, ctx, cancel - } + if err == nil { + break + } + simutil.Logger.Error("create region heartbeat stream error", zap.String("tag", c.tag), zap.Error(err)) + cancel() + select { + case <-c.ctx.Done(): + simutil.Logger.Info("cancel create stream loop") + return nil, ctx, cancel + case <-ticker.C: } - break } return stream, ctx, cancel }