Skip to content

Commit

Permalink
Fixed timers leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby committed Apr 1, 2024
1 parent f41615b commit 894d91b
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Fixed leak timers
* Changed default StartTime (time of retries for connect to server) for topic writer from 1 minute to infinite (can be overrided by WithWriterStartTimeout topic option)
* Added `Struct` support for `Variant` in `ydb.ParamsBuilder()`
* Added `go` with anonymous function case in `gstack`
Expand Down
24 changes: 19 additions & 5 deletions internal/coordination/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,12 @@ func (s *session) newStream(

var client Ydb_Coordination_V1.CoordinationService_SessionClient
if lastChance {
timer := time.NewTimer(s.options.SessionKeepAliveTimeout)
select {
case <-time.After(s.options.SessionKeepAliveTimeout):
case <-timer.C:
case client = <-result:
}
timer.Stop()

if client != nil {
return client, nil
Expand Down Expand Up @@ -175,10 +177,12 @@ func (s *session) newStream(
}

// Waiting for some time before trying to reconnect.
sessionReconnectDelay := time.NewTimer(s.options.SessionReconnectDelay)
select {
case <-time.After(s.options.SessionReconnectDelay):
case <-sessionReconnectDelay.C:
case <-s.ctx.Done():
}
sessionReconnectDelay.Stop()

if s.ctx.Err() != nil {
// Give this session the last chance to stop gracefully if the session is canceled in the reconnect cycle.
Expand Down Expand Up @@ -247,6 +251,7 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {

// Wait for the session started response unless the stream context is done. We intentionally do not take into
// account stream context cancellation in order to proceed with the graceful shutdown if it requires reconnect.
sessionStartTimer := time.NewTimer(s.options.SessionStartTimeout)
select {
case start := <-sessionStarted:
trace.CoordinationOnSessionStarted(s.client.config.Trace(), start.GetSessionId(), s.sessionID)
Expand All @@ -258,13 +263,14 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
cancelStream()
}
close(startSending)
case <-time.After(s.options.SessionStartTimeout):
case <-sessionStartTimer.C:
// Reconnect if no response was received before the timeout occurred.
trace.CoordinationOnSessionStartTimeout(s.client.config.Trace(), s.options.SessionStartTimeout)
cancelStream()
case <-streamCtx.Done():
case <-s.ctx.Done():
}
sessionStartTimer.Stop()

for {
// Respect the failure reason priority: if the session context is done, we must stop the session, even
Expand All @@ -280,8 +286,9 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
}

keepAliveTime := time.Until(s.getLastGoodResponseTime().Add(s.options.SessionKeepAliveTimeout))
keepAliveTimeTimer := time.NewTimer(keepAliveTime)
select {
case <-time.After(keepAliveTime):
case <-keepAliveTimeTimer.C:
last := s.getLastGoodResponseTime()
if time.Since(last) > s.options.SessionKeepAliveTimeout {
// Reconnect if the underlying stream is likely to be dead.
Expand All @@ -295,6 +302,7 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
case <-streamCtx.Done():
case <-s.ctx.Done():
}
keepAliveTimeTimer.Stop()
}

if closing {
Expand All @@ -318,8 +326,10 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
)

// Wait for the session stopped response unless the stream context is done.
sessionStopTimeout := time.NewTimer(s.options.SessionStopTimeout)
select {
case stop := <-sessionStopped:
sessionStopTimeout.Stop()
trace.CoordinationOnSessionStopped(s.client.config.Trace(), stop.GetSessionId(), s.sessionID)
if stop.GetSessionId() == s.sessionID {
cancelStream()
Expand All @@ -329,15 +339,19 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {

// Reconnect if the server response is invalid.
cancelStream()
case <-time.After(s.options.SessionStopTimeout):
case <-sessionStopTimeout.C:
sessionStopTimeout.Stop() // no really need, call stop for common style only

// Reconnect if no response was received before the timeout occurred.
trace.CoordinationOnSessionStopTimeout(s.client.config.Trace(), s.options.SessionStopTimeout)
cancelStream()
case <-s.ctx.Done():
sessionStopTimeout.Stop()
cancelStream()

return
case <-streamCtx.Done():
sessionStopTimeout.Stop()
}
}

Expand Down
5 changes: 4 additions & 1 deletion internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,10 @@ func (c *Client) internalPoolWaitFromCh(ctx context.Context, t *trace.Table) (s

var createSessionTimeoutCh <-chan time.Time
if timeout := c.config.CreateSessionTimeout(); timeout > 0 {
createSessionTimeoutCh = c.clock.After(timeout)
createSessionTimeoutChTimer := c.clock.NewTimer(timeout)
defer createSessionTimeoutChTimer.Stop()

createSessionTimeoutCh = createSessionTimeoutChTimer.Chan()
}

select {
Expand Down
5 changes: 4 additions & 1 deletion internal/topic/topicreaderinternal/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ func (c *committer) waitSendTrigger(ctx context.Context) {
return
}

finish := c.clock.After(c.BufferTimeLagTrigger)
bufferTimeLagTriggerTimer := c.clock.NewTimer(c.BufferTimeLagTrigger)
defer bufferTimeLagTriggerTimer.Stop()

finish := bufferTimeLagTriggerTimer.Chan()
if c.BufferCountTrigger == 0 {
select {
case <-ctxDone:
Expand Down
5 changes: 4 additions & 1 deletion internal/topic/topicreaderinternal/stream_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,12 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err
result <- connectResult{stream: stream, err: err}
}()

connectionTimoutTimer := r.clock.NewTimer(r.connectTimeout)
defer connectionTimoutTimer.Stop()

var res connectResult
select {
case <-r.clock.After(r.connectTimeout):
case <-connectionTimoutTimer.Chan():
// cancel connection context only if timeout exceed while connection
// because if cancel context after connect - it will break
cancel()
Expand Down
6 changes: 5 additions & 1 deletion internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,14 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
retryDuration := w.clock.Since(startOfRetries)
if backoff, retry := topic.CheckRetryMode(reconnectReason, w.retrySettings, retryDuration); retry {
delay := backoff.Delay(attempt)
delayTimer := w.clock.NewTimer(delay)
select {
case <-doneCtx:
delayTimer.Stop()

return
case <-w.clock.After(delay):
case <-delayTimer.Chan():
delayTimer.Stop() // no really need, stop for common style only
// pass
}
} else {
Expand Down
6 changes: 5 additions & 1 deletion internal/xsql/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,14 @@ func (c *Connector) idleCloser() (idleStopper func()) {
ctx, idleStopper = xcontext.WithCancel(context.Background())
go func() {
for {
idleThresholdTimer := c.clock.NewTimer(c.idleThreshold)
select {
case <-ctx.Done():
idleThresholdTimer.Stop()

return
case <-c.clock.After(c.idleThreshold):
case <-idleThresholdTimer.Chan():
idleThresholdTimer.Stop() // no really need, stop for common style only
c.connsMtx.RLock()
conns := make([]*conn, 0, len(c.conns))
for cc := range c.conns {
Expand Down

0 comments on commit 894d91b

Please sign in to comment.