Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JBERET-483 Implementation and tests for TaskSubmissionListener proposal #119

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion jberet-se/src/main/java/org/jberet/se/BatchSEEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class BatchSEEnvironment implements BatchEnvironment {
private final TransactionManager tm;
private final JobXmlResolver jobXmlResolver;
private final JobExecutor executor;
private final TaskSubmissionListener taskSubmissionListener;

static final String THREAD_POOL_TYPE = "thread-pool-type";
static final String THREAD_POOL_TYPE_CACHED = "Cached";
Expand All @@ -71,6 +72,8 @@ public final class BatchSEEnvironment implements BatchEnvironment {
static final String THREAD_POOL_PRESTART_ALL_CORE_THREADS = "thread-pool-prestart-all-core-threads";
static final String THREAD_POOL_REJECTION_POLICY = "thread-pool-rejection-policy";
static final String THREAD_FACTORY = "thread-factory";

static final String TASK_SUBMISSION_LISTENER = "task-submission-listener";

public BatchSEEnvironment() {
configProperties = new Properties();
Expand Down Expand Up @@ -101,6 +104,8 @@ protected int getMaximumPoolSize() {
};
final ServiceLoader<JobXmlResolver> userJobXmlResolvers = ServiceLoader.load(JobXmlResolver.class, getClassLoader());
this.jobXmlResolver = new ChainedJobXmlResolver(userJobXmlResolvers, DEFAULT_JOB_XML_RESOLVERS);

this.taskSubmissionListener = createTaskSubmissionListener();
}

@Override
Expand All @@ -119,7 +124,21 @@ public ArtifactFactory getArtifactFactory() {

@Override
public void submitTask(final JobTask task) {
executor.execute(task);
if(taskSubmissionListener == null) {
executor.execute(task);
} else {
try {
taskSubmissionListener.beforeSubmit();
} catch (Exception e) {
throw SEBatchMessages.MESSAGES.taskSubmissionListenerError(e, "beforeSubmit");
}
executor.execute(task);
try {
taskSubmissionListener.afterSubmit();
} catch (Exception e) {
throw SEBatchMessages.MESSAGES.taskSubmissionListenerError(e, "afterSubmit");
}
}
}

@Override
Expand Down Expand Up @@ -237,4 +256,17 @@ ThreadPoolExecutor createThreadPoolExecutor() {

throw SEBatchMessages.MESSAGES.failToGetConfigProperty(THREAD_POOL_TYPE, threadPoolType, null);
}

TaskSubmissionListener createTaskSubmissionListener() {
final String taskSubmissionListenerProp = configProperties.getProperty(TASK_SUBMISSION_LISTENER);
if (taskSubmissionListenerProp != null && !taskSubmissionListenerProp.isEmpty()) {
try {
final Class<?> taskSubmissionListenerClass = getClassLoader().loadClass(taskSubmissionListenerProp.trim());
return (TaskSubmissionListener) taskSubmissionListenerClass.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw SEBatchMessages.MESSAGES.failToGetConfigProperty(TASK_SUBMISSION_LISTENER, taskSubmissionListenerProp, e);
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.jberet.se;

/**
* Invoked around a task submission
*
* @author Formenti Lorenzo
*/
public interface TaskSubmissionListener {

void beforeSubmit();

void afterSubmit();

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what parameters do these 2 methods need? In your application and use case, which info are needed that should be passed in as parameters? org.jberet.spi.JobTask seems a good one, and I think implementers will need to know what JobTask it is in order to determine the appropriate action.

javax.batch.runtime.context.JobContext may be another one, to provide more details about the current job execution.

Also note that jberet JobExecutor is used for not only job execution, but also Split/Flow/Partition executions. If we attach such a listener this way, it will be called upon submission of a job, a flow, split and step partition. Is this what you intend it to be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to write an answer to your points multiple times, but more I think of it more I think I underestimated the addition of this feature to the library.

Now, below my original answer.

You definitely have a point with your questions, i was eager to submit the proposal that I ended missing the goal I posed to myself for the submission itself.

What I meant was described in the issue JBERET-483: I wanted for the jberet-se module a clean way to introspect the task submissions (and only the tasks) and, in my specific case, propagate data that is generated during the costruction of the job instance and the job execution to child threads via the TheadFactory object.

Also, by reading your last intervention

If we attach such a listener this way, it will be called upon submission of a job, a flow, split and step partition.

By looking the the references in the code now I get what you meant, I don't wish to have a listener for those objects too, and I don't think having it would be useful. One easy way would be to test the type of task submitted, but I don't really like it.

Another I can think of would probably be to split the generation of the jobtask and the submission, leaving the invoker class the task of rollback the transaction if needed, but this impacts more the current mechanics of the AbstractJobOperator class, (that is jberet-core, so behavior of the EE component is impacted.

On the other hand, it's probably needed to create a new interface on the line of JobTaskInformation, that lets the implementing class expose a minimal amount of information without letting the caller edit the JobContext data.

I would really like to think more about this part and come back with a better pull request. For the moment I think I'll close this one.

Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ public interface SEBatchMessages {

@Message(id = 50004, value = "The job %s did not complete with batch status %s, exit status %s.")
BatchRuntimeException jobDidNotComplete(String jobId, BatchStatus batchStatus, String exitStatus);

@Message(id = 50005, value = "Error while invoking task submission listener on %s method")
BatchRuntimeException taskSubmissionListenerError(@Cause Throwable th, String methodName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

package org.jberet.se;

import java.lang.reflect.Field;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
Expand All @@ -18,6 +19,8 @@
import java.util.concurrent.TimeUnit;
import javax.batch.operations.BatchRuntimeException;

import org.jberet.se.test.FailTaskSubmissionListener;
import org.jberet.spi.JobTask;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -34,6 +37,7 @@
import static org.jberet.se.BatchSEEnvironment.THREAD_POOL_TYPE_FIXED;

public class BatchSEEnvironmentTest {

private BatchSEEnvironment batchEnvironment = new BatchSEEnvironment();

@Test
Expand Down Expand Up @@ -140,6 +144,36 @@ public void testCreateThreadPoolExecutor() throws Exception {
System.out.printf("Got the expected %s%n", e);
}
}

@Test
public void verifyTaskSubmissionListener() throws Exception {

final JobTask task = new JobTask() {
@Override
public void run() {
System.out.printf("Everything's fine%n");
}

@Override
public int getRequiredRemainingPermits() {
return 0;
}
};

final Field failTaskSubmissionListener = BatchSEEnvironment.class.getDeclaredField("taskSubmissionListener");
failTaskSubmissionListener.setAccessible(true);
failTaskSubmissionListener.set(batchEnvironment, new FailTaskSubmissionListener());

try {
batchEnvironment.submitTask(task);
failTaskSubmissionListener.set(batchEnvironment, null);
Assert.fail("Expecting RuntimeException, got a correct execution");
} catch (BatchRuntimeException e) {
System.out.printf("Got the expected %s%n", e);
}

failTaskSubmissionListener.set(batchEnvironment, null);
}

private ThreadPoolExecutor verifyThreadPool(final int coreSize,
final int maxSize,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.jberet.se.test;

import org.jberet.se.TaskSubmissionListener;

public class FailTaskSubmissionListener implements TaskSubmissionListener {

@Override
public void beforeSubmit() {
throw new RuntimeException();
}

@Override
public void afterSubmit() {
throw new RuntimeException();
}

}
4 changes: 4 additions & 0 deletions jberet-se/src/test/resources/jberet.properties
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,7 @@ thread-factory =
# Optional, fully-qualified name of a class that implements java.util.concurrent.RejectedExecutionHandler.
# This property should not be needed in most cases.
thread-pool-rejection-policy =

# Optional, fully-qualified name of a class that implements org.jberet.se.TaskSubmissionListener.
# The object gets invoked around every job task submission
task-submission-listener =