From d19da573a08b75c902271961e97e37bafd44d7e7 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Tue, 19 Nov 2024 16:44:49 +0800 Subject: [PATCH] [Fix](Job)Fix the Calculation of the First Trigger Time and Add a Single-Time Scheduling Compensation Logic ### Background: The previous scheduling logic for calculating the first trigger time had the following issues: If the current time (currentTimeMs) exceeds the start of the time window (windowStartTimeMs), the first trigger time could be skipped, causing some tasks to be missed. Minor delays during code execution (e.g., between window initialization and scheduling logic execution) could result in tasks being missed within the active time window. ### Solution: - Adjustment of First Trigger Time Logic: When the initially calculated firstTriggerTime is less than or equal to the currentTimeMs, compute the number of missed intervals and directly adjust to the largest trigger time that is less than the currentTimeMs. - Single-Time Compensation Logic: Introduced a compensation mechanism to handle the slight delay caused by code execution. This ensures that tasks missed during the initialization phase are correctly scheduled. The compensation is explicitly single-time to avoid impacting the normal scheduling logic. --- .../doris/job/base/JobExecutionConfiguration.java | 5 +++-- .../job/base/JobExecutionConfigurationTest.java | 12 ++++++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 301222d5434ea3..4c6ef4d2037f86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -172,9 +172,10 @@ private List getExecutionDelaySeconds(long windowStartTimeMs, long windowE long firstTriggerTime = windowStartTimeMs + (intervalMs - ((windowStartTimeMs - startTimeMs) % intervalMs)) % intervalMs; if (firstTriggerTime < currentTimeMs) { - firstTriggerTime += intervalMs; + // Calculate how many intervals to add to get the largest trigger time < currentTimeMs + long intervalsToAdd = (currentTimeMs - firstTriggerTime) / intervalMs; + firstTriggerTime += intervalsToAdd * intervalMs; } - if (firstTriggerTime > windowEndTimeMs) { return timestamps; // Return an empty list if there won't be any trigger time } diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index 24c486baff81bc..cce0a93c01daf8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -60,14 +60,22 @@ public void testGetTriggerDelayTimesRecurring() { Assertions.assertArrayEquals(new Long[]{100L, 700L}, delayTimes.toArray()); delayTimes = configuration.getTriggerDelayTimes( 200000L, 0L, 1100000L); - Assertions.assertEquals(1, delayTimes.size()); - Assertions.assertArrayEquals(new Long[]{500L}, delayTimes.toArray()); + Assertions.assertEquals(2, delayTimes.size()); + Assertions.assertArrayEquals(new Long[]{0L, 500L}, delayTimes.toArray()); delayTimes = configuration.getTriggerDelayTimes( 1001000L, 0L, 1000000L); Assertions.assertEquals(1, delayTimes.size()); timerDefinition.setStartTimeMs(2000L); timerDefinition.setIntervalUnit(IntervalUnit.SECOND); Assertions.assertArrayEquals(new Long[]{2L, 12L}, configuration.getTriggerDelayTimes(100000L, 100000L, 120000L).toArray()); + + timerDefinition.setIntervalUnit(IntervalUnit.SECOND); + long second = 1000L; + timerDefinition.setStartTimeMs(second); + timerDefinition.setInterval(1L); + Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size()); + Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size()); + } @Test