diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java index e7066846c30919..d88971a6e72bde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java @@ -257,6 +257,17 @@ public static long timeStringToLong(String timeStr) { return d.getTime(); } + /** + * Converts a millisecond timestamp to a second-level timestamp. + * + * @param timestamp The millisecond timestamp to be converted. + * @return The timestamp rounded to the nearest second (in milliseconds). + */ + public static long convertToSecondTimestamp(long timestamp) { + // Divide by 1000 to convert to seconds, then multiply by 1000 to return to milliseconds with no fractional part + return (timestamp / 1000) * 1000; + } + public static long timeStringToLong(String timeStr, TimeZone timeZone) { DateTimeFormatter dateFormatTimeZone = getDatetimeFormatWithTimeZone(); dateFormatTimeZone.withZone(timeZone.toZoneId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 4c6ef4d2037f86..d564b1143122ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -155,7 +155,7 @@ private Long queryDelayTimeSecond(Long currentTimeMs, Long startTimeMs) { return 0L; } - return (startTimeMs - currentTimeMs) / 1000; + return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000; } // Returns a list of delay times in seconds for executing the job within the specified window diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java index 9068a18f693e12..96181877b9a568 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java @@ -17,6 +17,7 @@ package org.apache.doris.job.base; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.common.IntervalUnit; import com.google.gson.annotations.SerializedName; @@ -40,11 +41,15 @@ public class TimerDefinition { public void checkParams() { if (null == startTimeMs) { - startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval); + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + startTimeMs = currentTimeMs + intervalUnit.getIntervalMs(interval); } if (null != endTimeMs && endTimeMs < startTimeMs) { throw new IllegalArgumentException("endTimeMs must be greater than the start time"); } + if (null != endTimeMs) { + endTimeMs = TimeUtils.convertToSecondTimestamp(endTimeMs); + } if (null != intervalUnit) { if (null == interval) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 7f8b39f1e66dc7..d6d1d09c503753 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -84,7 +84,8 @@ public void start() { taskDisruptorGroupManager = new TaskDisruptorGroupManager(); taskDisruptorGroupManager.init(); this.timerJobDisruptor = taskDisruptorGroupManager.getDispatchDisruptor(); - latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + latestBatchSchedulerTimerTaskTimeMs = currentTimeMs; batchSchedulerTimerJob(); cycleSystemSchedulerTasks(); } @@ -94,7 +95,8 @@ public void start() { * Jobs will be re-registered after the task is completed */ private void cycleSystemSchedulerTasks() { - log.info("re-register system scheduler timer tasks" + TimeUtils.longToTimeString(System.currentTimeMillis())); + log.info("re-register system scheduler timer tasks, time is " + TimeUtils + .longToTimeStringWithms(System.currentTimeMillis())); timerTaskScheduler.newTimeout(timeout -> { batchSchedulerTimerJob(); cycleSystemSchedulerTasks(); @@ -144,7 +146,9 @@ public void close() throws IOException { private void cycleTimerJobScheduler(T job, long startTimeWindowMs) { - List delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(), + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + startTimeWindowMs = TimeUtils.convertToSecondTimestamp(startTimeWindowMs); + List delaySeconds = job.getJobConfig().getTriggerDelayTimes(currentTimeMs, startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs); if (CollectionUtils.isNotEmpty(delaySeconds)) { delaySeconds.forEach(delaySecond -> { @@ -185,7 +189,8 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() { long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs; if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) { - this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs; } this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; if (jobMap.isEmpty()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index cce0a93c01daf8..163b2494189d90 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -75,7 +75,14 @@ public void testGetTriggerDelayTimesRecurring() { timerDefinition.setInterval(1L); Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size()); Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size()); + timerDefinition.setStartTimeMs(1672531200000L); + timerDefinition.setIntervalUnit(IntervalUnit.MINUTE); + timerDefinition.setInterval(1L); + Assertions.assertArrayEquals(new Long[]{0L}, configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 1672531800000L).toArray()); + + List expectDelayTimes = configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 1672531850000L); + Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray()); } @Test