From 33aef1dd5950c42f4d272091a3c275a5a68eaea9 Mon Sep 17 00:00:00 2001 From: Lorenzo Formenti Date: Sat, 18 May 2019 17:37:38 +0200 Subject: [PATCH 1/2] Implementation and tests for TaskSubmissionListener proposal --- .../org/jberet/se/BatchSEEnvironment.java | 30 +++++++++++++++- .../org/jberet/se/TaskSubmissionListener.java | 14 ++++++++ .../jberet/se/_private/SEBatchMessages.java | 3 ++ .../org/jberet/se/BatchSEEnvironmentTest.java | 34 +++++++++++++++++++ .../se/test/FailTaskSubmissionListener.java | 17 ++++++++++ .../src/test/resources/jberet.properties | 4 +++ 6 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 jberet-se/src/main/java/org/jberet/se/TaskSubmissionListener.java create mode 100644 jberet-se/src/test/java/org/jberet/se/test/FailTaskSubmissionListener.java diff --git a/jberet-se/src/main/java/org/jberet/se/BatchSEEnvironment.java b/jberet-se/src/main/java/org/jberet/se/BatchSEEnvironment.java index 5b0677018..06adc03bd 100644 --- a/jberet-se/src/main/java/org/jberet/se/BatchSEEnvironment.java +++ b/jberet-se/src/main/java/org/jberet/se/BatchSEEnvironment.java @@ -57,6 +57,7 @@ public final class BatchSEEnvironment implements BatchEnvironment { private final TransactionManager tm; private final JobXmlResolver jobXmlResolver; private final JobExecutor executor; + private final TaskSubmissionListener taskSubmissionListener; static final String THREAD_POOL_TYPE = "thread-pool-type"; static final String THREAD_POOL_TYPE_CACHED = "Cached"; @@ -71,6 +72,8 @@ public final class BatchSEEnvironment implements BatchEnvironment { static final String THREAD_POOL_PRESTART_ALL_CORE_THREADS = "thread-pool-prestart-all-core-threads"; static final String THREAD_POOL_REJECTION_POLICY = "thread-pool-rejection-policy"; static final String THREAD_FACTORY = "thread-factory"; + + static final String TASK_SUBMISSION_LISTENER = "task-submission-listener"; public BatchSEEnvironment() { configProperties = new Properties(); @@ -101,6 +104,8 @@ protected int getMaximumPoolSize() { }; final ServiceLoader userJobXmlResolvers = ServiceLoader.load(JobXmlResolver.class, getClassLoader()); this.jobXmlResolver = new ChainedJobXmlResolver(userJobXmlResolvers, DEFAULT_JOB_XML_RESOLVERS); + + this.taskSubmissionListener = createTaskSubmissionListener(); } @Override @@ -119,7 +124,17 @@ public ArtifactFactory getArtifactFactory() { @Override public void submitTask(final JobTask task) { - executor.execute(task); + if(taskSubmissionListener == null) { + executor.execute(task); + } else { + try { + taskSubmissionListener.beforeSubmit(); + executor.execute(task); + taskSubmissionListener.afterSubmit(); + } catch (Exception e) { + throw SEBatchMessages.MESSAGES.taskSubmissionListenerError(e); + } + } } @Override @@ -237,4 +252,17 @@ ThreadPoolExecutor createThreadPoolExecutor() { throw SEBatchMessages.MESSAGES.failToGetConfigProperty(THREAD_POOL_TYPE, threadPoolType, null); } + + TaskSubmissionListener createTaskSubmissionListener() { + final String taskSubmissionListenerProp = configProperties.getProperty(TASK_SUBMISSION_LISTENER); + if (taskSubmissionListenerProp != null && !taskSubmissionListenerProp.isEmpty()) { + try { + final Class taskSubmissionListenerClass = getClassLoader().loadClass(taskSubmissionListenerProp.trim()); + return (TaskSubmissionListener) taskSubmissionListenerClass.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw SEBatchMessages.MESSAGES.failToGetConfigProperty(TASK_SUBMISSION_LISTENER, taskSubmissionListenerProp, e); + } + } + return null; + } } \ No newline at end of file diff --git a/jberet-se/src/main/java/org/jberet/se/TaskSubmissionListener.java b/jberet-se/src/main/java/org/jberet/se/TaskSubmissionListener.java new file mode 100644 index 000000000..2a9726f10 --- /dev/null +++ b/jberet-se/src/main/java/org/jberet/se/TaskSubmissionListener.java @@ -0,0 +1,14 @@ +package org.jberet.se; + +/** + * Invoked around a task submission + * + * @author Formenti Lorenzo + */ +public interface TaskSubmissionListener { + + void beforeSubmit(); + + void afterSubmit(); + +} diff --git a/jberet-se/src/main/java/org/jberet/se/_private/SEBatchMessages.java b/jberet-se/src/main/java/org/jberet/se/_private/SEBatchMessages.java index 56d5c848e..6e0513039 100644 --- a/jberet-se/src/main/java/org/jberet/se/_private/SEBatchMessages.java +++ b/jberet-se/src/main/java/org/jberet/se/_private/SEBatchMessages.java @@ -35,4 +35,7 @@ public interface SEBatchMessages { @Message(id = 50004, value = "The job %s did not complete with batch status %s, exit status %s.") BatchRuntimeException jobDidNotComplete(String jobId, BatchStatus batchStatus, String exitStatus); + + @Message(id = 50005, value = "Error while invoking task submission listener method") + BatchRuntimeException taskSubmissionListenerError(@Cause Throwable th); } diff --git a/jberet-se/src/test/java/org/jberet/se/BatchSEEnvironmentTest.java b/jberet-se/src/test/java/org/jberet/se/BatchSEEnvironmentTest.java index b8e95ff1b..9dc31c51d 100644 --- a/jberet-se/src/test/java/org/jberet/se/BatchSEEnvironmentTest.java +++ b/jberet-se/src/test/java/org/jberet/se/BatchSEEnvironmentTest.java @@ -10,6 +10,7 @@ package org.jberet.se; +import java.lang.reflect.Field; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionHandler; @@ -18,6 +19,8 @@ import java.util.concurrent.TimeUnit; import javax.batch.operations.BatchRuntimeException; +import org.jberet.se.test.FailTaskSubmissionListener; +import org.jberet.spi.JobTask; import org.junit.Assert; import org.junit.Test; @@ -34,6 +37,7 @@ import static org.jberet.se.BatchSEEnvironment.THREAD_POOL_TYPE_FIXED; public class BatchSEEnvironmentTest { + private BatchSEEnvironment batchEnvironment = new BatchSEEnvironment(); @Test @@ -140,6 +144,36 @@ public void testCreateThreadPoolExecutor() throws Exception { System.out.printf("Got the expected %s%n", e); } } + + @Test + public void verifyTaskSubmissionListener() throws Exception { + + final JobTask task = new JobTask() { + @Override + public void run() { + System.out.printf("Everything's fine%n"); + } + + @Override + public int getRequiredRemainingPermits() { + return 0; + } + }; + + final Field failTaskSubmissionListener = BatchSEEnvironment.class.getDeclaredField("taskSubmissionListener"); + failTaskSubmissionListener.setAccessible(true); + failTaskSubmissionListener.set(batchEnvironment, new FailTaskSubmissionListener()); + + try { + batchEnvironment.submitTask(task); + failTaskSubmissionListener.set(batchEnvironment, null); + Assert.fail("Expecting RuntimeException, got a correct execution"); + } catch (BatchRuntimeException e) { + System.out.printf("Got the expected %s%n", e); + } + + failTaskSubmissionListener.set(batchEnvironment, null); + } private ThreadPoolExecutor verifyThreadPool(final int coreSize, final int maxSize, diff --git a/jberet-se/src/test/java/org/jberet/se/test/FailTaskSubmissionListener.java b/jberet-se/src/test/java/org/jberet/se/test/FailTaskSubmissionListener.java new file mode 100644 index 000000000..487c38ce3 --- /dev/null +++ b/jberet-se/src/test/java/org/jberet/se/test/FailTaskSubmissionListener.java @@ -0,0 +1,17 @@ +package org.jberet.se.test; + +import org.jberet.se.TaskSubmissionListener; + +public class FailTaskSubmissionListener implements TaskSubmissionListener { + + @Override + public void beforeSubmit() { + throw new RuntimeException(); + } + + @Override + public void afterSubmit() { + throw new RuntimeException(); + } + +} diff --git a/jberet-se/src/test/resources/jberet.properties b/jberet-se/src/test/resources/jberet.properties index c6a453725..36f8c852e 100644 --- a/jberet-se/src/test/resources/jberet.properties +++ b/jberet-se/src/test/resources/jberet.properties @@ -117,3 +117,7 @@ thread-factory = # Optional, fully-qualified name of a class that implements java.util.concurrent.RejectedExecutionHandler. # This property should not be needed in most cases. thread-pool-rejection-policy = + +# Optional, fully-qualified name of a class that implements org.jberet.se.TaskSubmissionListener. +# The object gets invoked around every job task submission +task-submission-listener = From bc3b641aa8c5481c40940dc3840f003ce234d06e Mon Sep 17 00:00:00 2001 From: Lorenzo Formenti Date: Sat, 18 May 2019 17:49:09 +0200 Subject: [PATCH 2/2] Fix for issue that potentially caused masking of errors thrown by the taskExecutor.execute() method. --- .../src/main/java/org/jberet/se/BatchSEEnvironment.java | 8 ++++++-- .../main/java/org/jberet/se/_private/SEBatchMessages.java | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/jberet-se/src/main/java/org/jberet/se/BatchSEEnvironment.java b/jberet-se/src/main/java/org/jberet/se/BatchSEEnvironment.java index 06adc03bd..0c23aaae1 100644 --- a/jberet-se/src/main/java/org/jberet/se/BatchSEEnvironment.java +++ b/jberet-se/src/main/java/org/jberet/se/BatchSEEnvironment.java @@ -129,10 +129,14 @@ public void submitTask(final JobTask task) { } else { try { taskSubmissionListener.beforeSubmit(); - executor.execute(task); + } catch (Exception e) { + throw SEBatchMessages.MESSAGES.taskSubmissionListenerError(e, "beforeSubmit"); + } + executor.execute(task); + try { taskSubmissionListener.afterSubmit(); } catch (Exception e) { - throw SEBatchMessages.MESSAGES.taskSubmissionListenerError(e); + throw SEBatchMessages.MESSAGES.taskSubmissionListenerError(e, "afterSubmit"); } } } diff --git a/jberet-se/src/main/java/org/jberet/se/_private/SEBatchMessages.java b/jberet-se/src/main/java/org/jberet/se/_private/SEBatchMessages.java index 6e0513039..95780611c 100644 --- a/jberet-se/src/main/java/org/jberet/se/_private/SEBatchMessages.java +++ b/jberet-se/src/main/java/org/jberet/se/_private/SEBatchMessages.java @@ -36,6 +36,6 @@ public interface SEBatchMessages { @Message(id = 50004, value = "The job %s did not complete with batch status %s, exit status %s.") BatchRuntimeException jobDidNotComplete(String jobId, BatchStatus batchStatus, String exitStatus); - @Message(id = 50005, value = "Error while invoking task submission listener method") - BatchRuntimeException taskSubmissionListenerError(@Cause Throwable th); + @Message(id = 50005, value = "Error while invoking task submission listener on %s method") + BatchRuntimeException taskSubmissionListenerError(@Cause Throwable th, String methodName); }