From 72a63e6f37fbcb9c45cb813a9937a2cf397ab463 Mon Sep 17 00:00:00 2001 From: Lorenzo Formenti Date: Mon, 27 May 2019 22:37:18 +0200 Subject: [PATCH 1/3] Fixed case that caused ChunkRunner to spawn a huge amount of threads named "chunk-checkpoint-timer" when time-limit attribute was defined big enough in very quick chunk executions. --- .../jberet/runtime/runner/ChunkRunner.java | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java b/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java index 5530bf17b..fd8270fb5 100644 --- a/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java +++ b/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java @@ -13,8 +13,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; import javax.batch.api.chunk.CheckpointAlgorithm; import javax.batch.api.chunk.ItemProcessor; @@ -313,7 +311,7 @@ private void readProcessWriteItems() throws Exception { processingInfo.chunkState == ChunkState.RETRYING || processingInfo.chunkState == ChunkState.TO_END_RETRY) { if (processingInfo.chunkState == ChunkState.TO_START_NEW || processingInfo.chunkState == ChunkState.TO_END_RETRY) { - processingInfo.reset(); + processingInfo.reset(timeLimit); } //if during Chunk RETRYING, and an item is skipped, the ut is still active so no need to begin a new one if (tm.getStatus() != Status.STATUS_ACTIVE) { @@ -505,16 +503,8 @@ private void checkIfEndRetry(final ProcessingInfo processingInfo) { } private void beginCheckpoint(final ProcessingInfo processingInfo) throws Exception { - if (checkpointPolicy.equals("item")) { - if (timeLimit > 0) { - final Timer timer = new Timer("chunk-checkpoint-timer", true); - timer.schedule(new TimerTask() { - @Override - public void run() { - processingInfo.timerExpired = true; - } - }, timeLimit * 1000); - } + if (checkpointPolicy.equals("item") && timeLimit > 0) { + processingInfo.expiresAt = System.currentTimeMillis() + ( timeLimit * 1000 ); } //if chunk is already RETRYING, do not change it to RUNNING if (processingInfo.chunkState == ChunkState.TO_RETRY) { @@ -542,7 +532,7 @@ private boolean isReadyToCheckpoint(final ProcessingInfo processingInfo) throws return true; } if (timeLimit > 0) { - return processingInfo.timerExpired; + return processingInfo.timerExpired(); } return false; } @@ -838,7 +828,7 @@ private static final class ProcessingInfo { */ int count; - boolean timerExpired; + long expiresAt; ItemState itemState = ItemState.RUNNING; ChunkState chunkState = ChunkState.TO_START_NEW; @@ -858,9 +848,9 @@ private static final class ProcessingInfo { */ Integer failurePoint; - private void reset() { + private void reset(final int timeLimit) { count = 0; - timerExpired = false; + expiresAt = System.currentTimeMillis() + ( timeLimit * 1000 ); itemState = ItemState.RUNNING; chunkState = ChunkState.RUNNING; failurePoint = null; @@ -871,12 +861,16 @@ private boolean toStopItem() { itemState == ItemState.TO_RETRY_READ || itemState == ItemState.TO_RETRY_PROCESS || itemState == ItemState.TO_RETRY_WRITE; } + + private boolean timerExpired() { + return System.currentTimeMillis() > expiresAt; + } @Override public String toString() { final StringBuilder sb = new StringBuilder("ProcessingInfo{"); sb.append("count=").append(count); - sb.append(", timerExpired=").append(timerExpired); + sb.append(", timerExpired=").append(timerExpired()); sb.append(", itemState=").append(itemState); sb.append(", chunkState=").append(chunkState); sb.append(", checkpointPosition=").append(checkpointPosition); From 421b808908b1d2918e6b61cc3974035fa20bb6db Mon Sep 17 00:00:00 2001 From: Lorenzo Formenti Date: Tue, 28 May 2019 21:54:57 +0200 Subject: [PATCH 2/3] Moved from currentTimeMillis() to nanoTime() --- .../main/java/org/jberet/runtime/runner/ChunkRunner.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java b/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java index fd8270fb5..e8b311c0a 100644 --- a/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java +++ b/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java @@ -504,7 +504,7 @@ private void checkIfEndRetry(final ProcessingInfo processingInfo) { private void beginCheckpoint(final ProcessingInfo processingInfo) throws Exception { if (checkpointPolicy.equals("item") && timeLimit > 0) { - processingInfo.expiresAt = System.currentTimeMillis() + ( timeLimit * 1000 ); + processingInfo.expiresAt = timeLimit * 1_000_000 + System.nanoTime(); } //if chunk is already RETRYING, do not change it to RUNNING if (processingInfo.chunkState == ChunkState.TO_RETRY) { @@ -850,7 +850,7 @@ private static final class ProcessingInfo { private void reset(final int timeLimit) { count = 0; - expiresAt = System.currentTimeMillis() + ( timeLimit * 1000 ); + expiresAt = timeLimit * 1_000_000 + System.nanoTime(); itemState = ItemState.RUNNING; chunkState = ChunkState.RUNNING; failurePoint = null; @@ -863,7 +863,7 @@ private boolean toStopItem() { } private boolean timerExpired() { - return System.currentTimeMillis() > expiresAt; + return System.nanoTime() > expiresAt; } @Override From 8ebd7bd70a97e02e55f85ca395b93e11d1d960be Mon Sep 17 00:00:00 2001 From: Lorenzo Formenti Date: Tue, 28 May 2019 21:57:14 +0200 Subject: [PATCH 3/3] Forced long literal --- .../src/main/java/org/jberet/runtime/runner/ChunkRunner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java b/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java index e8b311c0a..03fdfd5c0 100644 --- a/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java +++ b/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java @@ -504,7 +504,7 @@ private void checkIfEndRetry(final ProcessingInfo processingInfo) { private void beginCheckpoint(final ProcessingInfo processingInfo) throws Exception { if (checkpointPolicy.equals("item") && timeLimit > 0) { - processingInfo.expiresAt = timeLimit * 1_000_000 + System.nanoTime(); + processingInfo.expiresAt = timeLimit * 1_000_000L + System.nanoTime(); } //if chunk is already RETRYING, do not change it to RUNNING if (processingInfo.chunkState == ChunkState.TO_RETRY) { @@ -850,7 +850,7 @@ private static final class ProcessingInfo { private void reset(final int timeLimit) { count = 0; - expiresAt = timeLimit * 1_000_000 + System.nanoTime(); + expiresAt = timeLimit * 1_000_000L + System.nanoTime(); itemState = ItemState.RUNNING; chunkState = ChunkState.RUNNING; failurePoint = null;