From c9f973f087e05a0bf006bcbfaec2542b06ab0184 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Fri, 6 Dec 2024 11:22:51 +0800 Subject: [PATCH] [fix](job)Fix millisecond offset issue in time window scheduling trigger time calculation ### Abstract: In the current time window scheduling logic, the calculation of trigger times was not strictly aligned to the second level, which could lead to millisecond offsets. This offset caused issues such as consecutive trigger times at 14:56:59 and 14:57:00, disrupting the correctness of the scheduling. This PR optimizes the calculation of trigger times to ensure that time points are strictly aligned to the second level, preventing the accumulation of millisecond errors. ### Issue Description: Under a specified window (e.g., 14:50:00 to 14:59:00) and a fixed interval (e.g., every minute), the scheduler generated erroneous trigger times such as: ``` | 2024-12-04 14:56:59 | | 2024-12-04 14:57:00 | | 2024-12-04 14:57:59 | | 2024-12-04 14:58:00 | ``` #### Cause: The current firstTriggerTime and the loop calculation did not strictly align trigger times to the second level, resulting in erroneous trigger points due to floating-point or millisecond offset accumulation. The end condition for the time window was not aligned to the second level, which could lead to additional trigger times being included. ### Fix: Modification 1: Strictly align the trigger time to the second level. --- .../org/apache/doris/common/util/TimeUtils.java | 11 +++++++++++ .../doris/job/base/JobExecutionConfiguration.java | 2 +- .../org/apache/doris/job/base/TimerDefinition.java | 7 ++++++- .../apache/doris/job/scheduler/JobScheduler.java | 13 +++++++++---- .../job/base/JobExecutionConfigurationTest.java | 7 +++++++ 5 files changed, 34 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java index e7066846c30919..d88971a6e72bde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java @@ -257,6 +257,17 @@ public static long timeStringToLong(String timeStr) { return d.getTime(); } + /** + * Converts a millisecond timestamp to a second-level timestamp. + * + * @param timestamp The millisecond timestamp to be converted. + * @return The timestamp rounded to the nearest second (in milliseconds). + */ + public static long convertToSecondTimestamp(long timestamp) { + // Divide by 1000 to convert to seconds, then multiply by 1000 to return to milliseconds with no fractional part + return (timestamp / 1000) * 1000; + } + public static long timeStringToLong(String timeStr, TimeZone timeZone) { DateTimeFormatter dateFormatTimeZone = getDatetimeFormatWithTimeZone(); dateFormatTimeZone.withZone(timeZone.toZoneId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 4c6ef4d2037f86..d564b1143122ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -155,7 +155,7 @@ private Long queryDelayTimeSecond(Long currentTimeMs, Long startTimeMs) { return 0L; } - return (startTimeMs - currentTimeMs) / 1000; + return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000; } // Returns a list of delay times in seconds for executing the job within the specified window diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java index 9068a18f693e12..96181877b9a568 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java @@ -17,6 +17,7 @@ package org.apache.doris.job.base; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.common.IntervalUnit; import com.google.gson.annotations.SerializedName; @@ -40,11 +41,15 @@ public class TimerDefinition { public void checkParams() { if (null == startTimeMs) { - startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval); + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + startTimeMs = currentTimeMs + intervalUnit.getIntervalMs(interval); } if (null != endTimeMs && endTimeMs < startTimeMs) { throw new IllegalArgumentException("endTimeMs must be greater than the start time"); } + if (null != endTimeMs) { + endTimeMs = TimeUtils.convertToSecondTimestamp(endTimeMs); + } if (null != intervalUnit) { if (null == interval) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 7f8b39f1e66dc7..d6d1d09c503753 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -84,7 +84,8 @@ public void start() { taskDisruptorGroupManager = new TaskDisruptorGroupManager(); taskDisruptorGroupManager.init(); this.timerJobDisruptor = taskDisruptorGroupManager.getDispatchDisruptor(); - latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + latestBatchSchedulerTimerTaskTimeMs = currentTimeMs; batchSchedulerTimerJob(); cycleSystemSchedulerTasks(); } @@ -94,7 +95,8 @@ public void start() { * Jobs will be re-registered after the task is completed */ private void cycleSystemSchedulerTasks() { - log.info("re-register system scheduler timer tasks" + TimeUtils.longToTimeString(System.currentTimeMillis())); + log.info("re-register system scheduler timer tasks, time is " + TimeUtils + .longToTimeStringWithms(System.currentTimeMillis())); timerTaskScheduler.newTimeout(timeout -> { batchSchedulerTimerJob(); cycleSystemSchedulerTasks(); @@ -144,7 +146,9 @@ public void close() throws IOException { private void cycleTimerJobScheduler(T job, long startTimeWindowMs) { - List delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(), + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + startTimeWindowMs = TimeUtils.convertToSecondTimestamp(startTimeWindowMs); + List delaySeconds = job.getJobConfig().getTriggerDelayTimes(currentTimeMs, startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs); if (CollectionUtils.isNotEmpty(delaySeconds)) { delaySeconds.forEach(delaySecond -> { @@ -185,7 +189,8 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() { long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs; if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) { - this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs; } this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; if (jobMap.isEmpty()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index cce0a93c01daf8..163b2494189d90 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -75,7 +75,14 @@ public void testGetTriggerDelayTimesRecurring() { timerDefinition.setInterval(1L); Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size()); Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size()); + timerDefinition.setStartTimeMs(1672531200000L); + timerDefinition.setIntervalUnit(IntervalUnit.MINUTE); + timerDefinition.setInterval(1L); + Assertions.assertArrayEquals(new Long[]{0L}, configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 1672531800000L).toArray()); + + List expectDelayTimes = configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 1672531850000L); + Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray()); } @Test