Skip to content

Commit

Permalink
[fix](job)Fix millisecond offset issue in time window scheduling trig…
Browse files Browse the repository at this point in the history
…ger time calculation

### Abstract:
In the current time window scheduling logic, the calculation of trigger times was not strictly aligned to the second level, which could lead to millisecond offsets. This offset caused issues such as consecutive trigger times at 14:56:59 and 14:57:00, disrupting the correctness of the scheduling.

This PR optimizes the calculation of trigger times to ensure that time points are strictly aligned to the second level, preventing the accumulation of millisecond errors.

### Issue Description:

Under a specified window (e.g., 14:50:00 to 14:59:00) and a fixed interval (e.g., every minute), the scheduler generated erroneous trigger times such as:

```
| 2024-12-04 14:56:59 |
| 2024-12-04 14:57:00 |
| 2024-12-04 14:57:59 |
| 2024-12-04 14:58:00 |
```
#### Cause:
The current firstTriggerTime and the loop calculation did not strictly align trigger times to the second level, resulting in erroneous trigger points due to floating-point or millisecond offset accumulation. The end condition for the time window was not aligned to the second level, which could lead to additional trigger times being included.

### Fix:
Modification 1: Strictly align the trigger time to the second level.
  • Loading branch information
CalvinKirs committed Dec 9, 2024
1 parent 6b4b3cb commit c9f973f
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,17 @@ public static long timeStringToLong(String timeStr) {
return d.getTime();
}

/**
* Converts a millisecond timestamp to a second-level timestamp.
*
* @param timestamp The millisecond timestamp to be converted.
* @return The timestamp rounded to the nearest second (in milliseconds).
*/
public static long convertToSecondTimestamp(long timestamp) {
// Divide by 1000 to convert to seconds, then multiply by 1000 to return to milliseconds with no fractional part
return (timestamp / 1000) * 1000;
}

public static long timeStringToLong(String timeStr, TimeZone timeZone) {
DateTimeFormatter dateFormatTimeZone = getDatetimeFormatWithTimeZone();
dateFormatTimeZone.withZone(timeZone.toZoneId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private Long queryDelayTimeSecond(Long currentTimeMs, Long startTimeMs) {
return 0L;
}

return (startTimeMs - currentTimeMs) / 1000;
return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000;
}

// Returns a list of delay times in seconds for executing the job within the specified window
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.job.base;

import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.IntervalUnit;

import com.google.gson.annotations.SerializedName;
Expand All @@ -40,11 +41,15 @@ public class TimerDefinition {

public void checkParams() {
if (null == startTimeMs) {
startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval);
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
startTimeMs = currentTimeMs + intervalUnit.getIntervalMs(interval);
}
if (null != endTimeMs && endTimeMs < startTimeMs) {
throw new IllegalArgumentException("endTimeMs must be greater than the start time");
}
if (null != endTimeMs) {
endTimeMs = TimeUtils.convertToSecondTimestamp(endTimeMs);
}

if (null != intervalUnit) {
if (null == interval) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void start() {
taskDisruptorGroupManager = new TaskDisruptorGroupManager();
taskDisruptorGroupManager.init();
this.timerJobDisruptor = taskDisruptorGroupManager.getDispatchDisruptor();
latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
}
Expand All @@ -94,7 +95,8 @@ public void start() {
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
log.info("re-register system scheduler timer tasks" + TimeUtils.longToTimeString(System.currentTimeMillis()));
log.info("re-register system scheduler timer tasks, time is " + TimeUtils
.longToTimeStringWithms(System.currentTimeMillis()));
timerTaskScheduler.newTimeout(timeout -> {
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
Expand Down Expand Up @@ -144,7 +146,9 @@ public void close() throws IOException {


private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(),
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
startTimeWindowMs = TimeUtils.convertToSecondTimestamp(startTimeWindowMs);
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(currentTimeMs,
startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs);
if (CollectionUtils.isNotEmpty(delaySeconds)) {
delaySeconds.forEach(delaySecond -> {
Expand Down Expand Up @@ -185,7 +189,8 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() {

long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs;
if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
}
this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
if (jobMap.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ public void testGetTriggerDelayTimesRecurring() {
timerDefinition.setInterval(1L);
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size());
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size());
timerDefinition.setStartTimeMs(1672531200000L);
timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
timerDefinition.setInterval(1L);
Assertions.assertArrayEquals(new Long[]{0L}, configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 1672531800000L).toArray());

List<Long> expectDelayTimes = configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 1672531850000L);

Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray());
}

@Test
Expand Down

0 comments on commit c9f973f

Please sign in to comment.