Skip to content

Commit

Permalink
fix: Fix scheduling issues that can cause unexpected timeouts on new …
Browse files Browse the repository at this point in the history
…connections
  • Loading branch information
driskell committed Jul 30, 2024
1 parent 3f0e8f3 commit b0980cc
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 1 deletion.
1 change: 1 addition & 0 deletions lc-lib/publisher/endpoint/sink_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (s *Sink) ProcessEvent(event transports.Event) (endpoint *Endpoint, err err
err = fmt.Errorf("unexpected %T message received", event)
}

s.Scheduler.Reschedule()
return
}

Expand Down
4 changes: 3 additions & 1 deletion lc-lib/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ func (p *Publisher) runOnce() bool {
} else if p.resendList.Len() != 0 {
log.Debug("Holding %d new events until the resend queue is flushed", len(spool))
} else if p.endpointSink.CanQueue() {
if _, ok := p.sendEvents(spool); ok {
_, ok := p.sendEvents(spool)
p.endpointSink.Scheduler.Reschedule()
if ok {
break
}

Expand Down
1 change: 1 addition & 0 deletions lc-lib/receiver/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ ReceiverLoop:
// Schedule partial ack if this is first set of events
if len(connectionStatus.progress) == 0 {
r.scheduler.Set(connection, 5*time.Second)
r.scheduler.Reschedule()
}
connectionStatus.progress = append(connectionStatus.progress, &poolEventProgress{event: eventImpl, sequence: 0})
r.connectionLock.Unlock()
Expand Down

0 comments on commit b0980cc

Please sign in to comment.