diff --git a/pkg/monitoring/poller.go b/pkg/monitoring/poller.go index 2c186a845..cbad5e1c0 100644 --- a/pkg/monitoring/poller.go +++ b/pkg/monitoring/poller.go @@ -5,6 +5,9 @@ import ( "errors" "fmt" "time" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" ) // Poller implements Updater by periodically invoking a Source's Fetch() method. @@ -63,21 +66,25 @@ func (s *sourcePoller) Run(ctx context.Context) { } } - reusedTimer := time.NewTimer(s.pollInterval) + ticker := services.TickerConfig{ + Initial: timeutil.JitterPct(0.1).Apply(s.pollInterval), + JitterPct: 0.2, + }.NewTicker(s.pollInterval) + defer ticker.Stop() for { select { - case <-reusedTimer.C: + case <-ticker.C: data, err := s.executeFetch(ctx) if err != nil { if errors.Is(err, ErrNoUpdate) { s.log.Debugw("no update found") - reusedTimer.Reset(s.pollInterval) + ticker.Reset() continue } else if errors.Is(err, context.Canceled) { return } s.log.Errorw("failed to fetch from source", "error", err) - reusedTimer.Reset(s.pollInterval) + ticker.Reset() continue } select { @@ -85,11 +92,8 @@ func (s *sourcePoller) Run(ctx context.Context) { case <-ctx.Done(): return } - reusedTimer.Reset(s.pollInterval) + ticker.Reset() case <-ctx.Done(): - if !reusedTimer.Stop() { - <-reusedTimer.C - } return } } diff --git a/pkg/monitoring/poller_test.go b/pkg/monitoring/poller_test.go index 3027d7087..18c1c3266 100644 --- a/pkg/monitoring/poller_test.go +++ b/pkg/monitoring/poller_test.go @@ -31,7 +31,7 @@ func TestPoller(t *testing.T) { 0, 0, 4, - 5, + 6, }, { "slow fetching, quick polling, no buffering", @@ -42,7 +42,7 @@ func TestPoller(t *testing.T) { 0, 0, 20, - 50, + 60, }, { "fast fetch, fast polling, insufficient buffering, tons of backpressure", @@ -53,7 +53,7 @@ func TestPoller(t *testing.T) { 200 * time.Millisecond, // time it gets the "consumer" to process a message. It will only be able to process 1000/200=5 updates per second. 5, 4, - 5, + 6, }, } { t.Run(testCase.name, func(t *testing.T) {