Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions jbatch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
<dependency>
<groupId>com.ibm.jbatch.tck</groupId>
<artifactId>com.ibm.jbatch.tck</artifactId>
<version>1.1-b02</version>
<version>1.1-b03</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand All @@ -118,7 +118,7 @@
<dependency>
<groupId>com.ibm.jbatch.tck</groupId>
<artifactId>com.ibm.jbatch.tck.spi</artifactId>
<version>1.1-b02</version>
<version>1.1-b03</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ public abstract class BaseStepController implements ExecutionElementController {

protected StepContextImpl stepContext;
protected Step step;
protected String stepName;
protected StepStatus stepStatus;

protected BlockingQueue<PartitionDataWrapper> analyzerStatusQueue = null;

protected long rootJobExecutionId;

// Restart of partitioned steps needs to be handled specially
protected boolean restartAfterCompletion = false;

protected final BatchKernelService kernelService;
protected final PersistenceManagerService persistenceManagerService;
private final JobStatusManagerService statusManagerService;
Expand All @@ -90,7 +94,7 @@ protected BaseStepController(final RuntimeJobExecution jobExecution, final Step
throw new IllegalArgumentException("Step parameter to ctor cannot be null.");
}
this.step = step;

this.stepName = step.getId();
this.txService = servicesManager.service(TransactionManagementService.class);
this.kernelService = servicesManager.service(BatchKernelService.class);
this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
Expand Down Expand Up @@ -311,6 +315,8 @@ private boolean shouldStepBeExecutedOnRestart() {
// boolean, but it should default to 'false', which is the spec'd default.
if (!Boolean.parseBoolean(step.getAllowStartIfComplete())) {
return false;
} else {
restartAfterCompletion = true;
}
}

Expand Down Expand Up @@ -364,6 +370,10 @@ protected void persistUserData() {
statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
}

protected boolean isRestartExecution() {
return stepStatus.getStartCount() > 1;
}

protected void persistExitStatusAndEndTimestamp() {
stepStatus.setExitStatus(stepContext.getExitStatus());
statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,32 @@ public class PartitionedStepController extends BaseStepController {

private PartitionReducer partitionReducerProxy = null;

private enum ExecutionType {
/**
* First execution of this step for the job instance (among all job executions)
*/
START,
/**
* Step previously executed but did not complete successfully, override=false so continue from previous partitions' checkpoints, etc.
*/
RESTART_NORMAL,
/**
* Step previously executed but did not complete successfully, override=true so start with an entire set of new partitions, checkpoints, etc.
*/
RESTART_OVERRIDE,
/**
* Step previously completed, but we are re-executing with an entire set of new partitions, checkpoints, etc.
*/
RESTART_AFTER_COMPLETION};
private ExecutionType executionType = null;

// On invocation this will be re-primed to reflect already-completed partitions from a previous execution.
int numPreviouslyCompleted = 0;

private PartitionAnalyzer analyzerProxy = null;

final List<JSLJob> subJobs = new ArrayList<JSLJob>();

protected List<StepListener> stepListeners = null;

List<BatchPartitionWorkUnit> completedWork = new ArrayList<BatchPartitionWorkUnit>();
Expand Down Expand Up @@ -180,7 +200,7 @@ private PartitionPlan generatePartitionPlan() {
int numPartitions = Integer.MIN_VALUE;
int numThreads;
Properties[] partitionProps = null;

if (partitionsAttr != null) {
try {
numPartitions = Integer.parseInt(partitionsAttr);
Expand Down Expand Up @@ -247,14 +267,47 @@ private PartitionPlan generatePartitionPlan() {
return plan;
}

private void calculateExecutionType() {
// We want to ignore override on the initial execution
if (isRestartExecution()) {
if (restartAfterCompletion) {
executionType = ExecutionType.RESTART_AFTER_COMPLETION;
} else if (plan.getPartitionsOverride()) {
executionType = ExecutionType.RESTART_OVERRIDE;
} else {
executionType = ExecutionType.RESTART_NORMAL;
}
} else {
executionType = ExecutionType.START;
}
}

private void validateNumberOfPartitions() {

int currentPlanSize = plan.getPartitions();

if (executionType == ExecutionType.RESTART_NORMAL) {
int previousPlanSize = stepStatus.getNumPartitions();
if (previousPlanSize > 0 && previousPlanSize != currentPlanSize) {
String msg = "On a normal restart, the plan on restart specified: " + currentPlanSize + " # of partitions, but the previous " +
"executions' plan specified a different number: " + previousPlanSize + " # of partitions. Failing job.";
throw new IllegalStateException(msg);
}
}

//persist the partition plan so on restart we have the same plan to reuse
stepStatus.setNumPartitions(currentPlanSize);
}


@Override
protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {

this.plan = this.generatePartitionPlan();

//persist the partition plan so on restart we have the same plan to reuse
stepStatus.setNumPartitions(plan.getPartitions());
calculateExecutionType();

validateNumberOfPartitions();

/* When true is specified, the partition count from the current run
* is used and all results from past partitions are discarded. Any
Expand All @@ -263,7 +316,7 @@ protected void invokeCoreStep() throws JobRestartException, JobStartException, J
* rollbackPartitionedStep method is invoked during restart before any
* partitions begin processing to provide a cleanup hook.
*/
if (plan.getPartitionsOverride()) {
if (executionType == ExecutionType.RESTART_OVERRIDE) {
if (this.partitionReducerProxy != null) {
try {
this.partitionReducerProxy.rollbackPartitionedStep();
Expand Down Expand Up @@ -303,9 +356,14 @@ private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartExc
PartitionsBuilderConfig config =
new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
// Then build all the subjobs but do not start them yet
if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
if (executionType == ExecutionType.RESTART_NORMAL) {
parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext);
} else {
// This case includes RESTART_OVERRIDE and RESTART_AFTER_COMPLETION.
//
// So we're just going to create new "subjob" job instances in the DB in these cases,
// and we'll have to make sure we're dealing with the correct ones, say in a subsequent "normal" restart
// (of the current execution which is itself a restart)
parallelBatchWorkUnits = kernelService.buildNewParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext);
}

Expand All @@ -324,6 +382,12 @@ private void executeAndWaitForCompletion() throws JobRestartException {
int numCurrentCompleted = 0;
int numCurrentSubmitted = 0;


// All partitions have already completed on a previous execution.
if (numTotalForThisExecution == 0) {
return;
}

//Start up to to the max num we are allowed from the num threads attribute
for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ public List<BatchPartitionWorkUnit> buildNewParallelPartitions(final PartitionsB
return batchWorkUnits;
}

/*
* There are some assumptions that all partition subjobs have associated DB entries
*/
@Override
public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(final PartitionsBuilderConfig config, final JobContextImpl jc, final StepContextImpl sc)
throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
Expand All @@ -201,7 +204,7 @@ public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(final Parti
final Properties partitionProps = (partitionProperties == null) ? null : partitionProperties[instance];

try {
final long execId = getMostRecentExecutionId(parallelJob);
final long execId = getMostRecentSubJobExecutionId(parallelJob);
final RuntimeJobExecution jobExecution;
try {
jobExecution = JobExecutionHelper.restartPartition(servicesManager, execId, parallelJob, partitionProps);
Expand Down Expand Up @@ -244,15 +247,13 @@ public BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(final FlowInSplitBui
return batchWork;
}

private long getMostRecentExecutionId(final JSLJob jobModel) {
private long getMostRecentSubJobExecutionId(JSLJob jobModel) {

//There can only be one instance associated with a subjob's id since it is generated from an unique
//job instance id. So there should be no way to directly start a subjob with particular
final List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 2);
// Pick off the first, knowing the ordering. There could be more than one.
List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 1);

// Maybe we should blow up on '0' too?
if (instanceIds.size() > 1) {
throw new IllegalStateException("Found " + instanceIds.size() + " entries for instance id = " + jobModel.getId() + ", which should not have happened. Blowing up.");
if (instanceIds.size() == 0) {
throw new IllegalStateException("Did not find an entry for job name = " + jobModel.getId());
}

final List<InternalJobExecution> partitionExecs = persistenceService.jobOperatorGetJobExecutions(instanceIds.get(0));
Expand All @@ -271,7 +272,7 @@ public BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(final FlowInSp
throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {

final JSLJob jobModel = config.getJobModel();
final long execId = getMostRecentExecutionId(jobModel);
final long execId = getMostRecentSubJobExecutionId(jobModel);
final RuntimeFlowInSplitExecution jobExecution;
try {
jobExecution = JobExecutionHelper.restartFlowInSplit(servicesManager, execId, jobModel);
Expand Down