From 5bdf91ea062fc54a318e2154bff90010550f44da Mon Sep 17 00:00:00 2001 From: Joachim DB Systel Date: Mon, 4 Nov 2024 11:11:58 +0100 Subject: [PATCH 1/3] fixed merge window time drift issue See: https://github.com/influxdata/telegraf/issues/13403 --- models/running_aggregator.go | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/models/running_aggregator.go b/models/running_aggregator.go index 56d487a2255ed..fb13ec5639c79 100644 --- a/models/running_aggregator.go +++ b/models/running_aggregator.go @@ -174,8 +174,34 @@ func (r *RunningAggregator) Push(acc telegraf.Accumulator) { r.Lock() defer r.Unlock() + // In case of time drift forward (e.g. after sleep) + // we will have intentionally a long period, so + // metrics got stuck in meantime will not lost. since := r.periodEnd - until := r.periodEnd.Add(r.Config.Period) + until := time.Now().Add(r.Config.Period) + + // Truncate() eliminates the monotonic clock from the + // time which otherwise may lead to miscalculation of + // the duration. We want to calculate the duration + // based on the wall clock time. The check, if a metric + // is discarded or not, ist based on the wall clock + // time. + duration := until.Truncate(-1).Sub(since.Truncate(-1)) + + if duration < r.Config.Period { + // In case of time drift backwards, a new + // period based on now is constructed. + since = time.Now() + until = since.Add(r.Config.Period) + } + + // Note: + // If the time drifted out of the merge window + // and a metric with that new time is pushed and + // the merge window was not yet adjusted, this + // metric will discarded anyway. There is no way + // to prevent this :-( + r.UpdateWindow(since, until) start := time.Now() From 1bdadcdc757d324db87aded4111e126101125783 Mon Sep 17 00:00:00 2001 From: JoachimSchachermayer Date: Mon, 4 Nov 2024 17:08:22 +0100 Subject: [PATCH 2/3] fixed fmt issues --- models/running_aggregator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/running_aggregator.go b/models/running_aggregator.go index fb13ec5639c79..edfe9b56aa7b8 100644 --- a/models/running_aggregator.go +++ b/models/running_aggregator.go @@ -201,7 +201,7 @@ func (r *RunningAggregator) Push(acc telegraf.Accumulator) { // the merge window was not yet adjusted, this // metric will discarded anyway. There is no way // to prevent this :-( - + r.UpdateWindow(since, until) start := time.Now() From ec57fe3ff92f05068ad08fdf7799e949c6febc59 Mon Sep 17 00:00:00 2001 From: JoachimSchachermayer Date: Tue, 7 Jan 2025 09:35:27 +0100 Subject: [PATCH 3/3] small comment change --- models/running_aggregator.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/models/running_aggregator.go b/models/running_aggregator.go index 4629939175e74..ef5f0c92e3157 100644 --- a/models/running_aggregator.go +++ b/models/running_aggregator.go @@ -183,9 +183,9 @@ func (r *RunningAggregator) Push(acc telegraf.Accumulator) { // Truncate() eliminates the monotonic clock from the // time which otherwise may lead to miscalculation of // the duration. We want to calculate the duration - // based on the wall clock time. The check, if a metric - // is discarded or not, ist based on the wall clock - // time. + // based on the wall clock time because the check, if + // a metric is discarded or not, is based on the wall + // clock time (see Add() some lines above). duration := until.Truncate(-1).Sub(since.Truncate(-1)) if duration < r.Config.Period {