Spring Batch is a battle tested Java framework that makes it easy to write batch applications. Batch applications involve reliably and efficiently processing large volumes of data to and from various data sources (files, databases, messaging middleware, and so on). Spring Batch is great at doing this and provides the necessary foundation to meet the stringent requirements of batch applications. It provides mechanisms for common tasks such as task orchestration, partitioning, and restart.
Spring batch jobs may require boilerplate code to be written, which is extracted out in this library to promote reusability.
Common components of a Spring batch job are defined as Beans and can be reused across multiple jobs.
See usage in Spring Batch Job implemented as Spring Cloud Task
and Spring Rest service
.
- Provides common components and utility classes to easily create Spring batch jobs.
- Provides opinionated default configurations for Spring batch jobs.
- Supports partitioning of jobs to process data concurrently.
- Autoconfigures fault tolerance with intelligent defaults to retry and recover for transient failure.
- The records are processed in chunks, if the job fails midway, it can be restarted from the last failed chunk without re processing already processed records.
- Supports force restarting already completed jobs.
- Supports skipping records in case of exceptions.
- Supports logging of job and step execution events.
Following are the classes provided by this library.
BatchConguration
ExtendsDefaultBatchConfiguration
and defines default configuration for Spring batch jobs. It is autoconfigured by Spring boot.AbstractJobExecutor
Extendable by consumer application Job executor to execute job with Run Id Incrementer to force restart the job in case it was successfully completed in last execution.AbstractPartitioner
Provides common implementation for partitioning Spring batch jobs. Consumer applications need to extend this class and provide implementation forpartitioningList
method to returnList
of partitioning candidateString
s.JobConfigurationSupport
Extendable by consumer application to define new Simple and Partitioned jobs with default configurations. The defaults can be overridden per job by consumer applications by overriding respective methods. Or default can be overridden globally in consumer application by defining new bean for respective component.LoggingJobListener
Provides default implementation for Spring batch job listener, which does nothing but logging only.LoggingStepListener
Provides default implementation for Spring batch step listeners, which does nothing but logging only.MongoAggregationPagingItemReader
Custom Mongo Paging Item reader using aggregation pipeline and pagination.MongoUpsertItemWriter
Custom Mongo Item writer for upsert operation.ListFlattenerKafkaItemWriter
Custom Kafka writer to write aList
of items to kafka. Can be used in cases where the lastProcessor
return a List of items, instead of a single item.StepStatus
Utility Class to define custom Step status, can be enhanced to add more statuses.SkipRecordException
Custom exception to represent skipped records in Spring batch jobs. Default implementation ofSkipPolicy
includes this exception.BatchProperties
Spring boot configuration property class to read batch properties fromapplication.properties
orapplication.yml
file.
Following are the components, autoconfigured as Beans by Spring boot with opinionated default behaviour. The defaults can be customized by configurations and custom implementations in consumer application.
JobParametersIncrementer
to generate unique run id for each job execution in case of force restarting already successfully completed jobs. Each Job execution is uniquely identified by combination of itsidentifying
parameters. If a job is restarted with same identifying parameters, Spring batch will throwJobInstanceAlreadyCompleteException
. So to force restart the job,AbstractJobExecutor#execute
method adds a uniquerun.id
to the job execution parameters ifforceRestart
argument istrue
. It can be overridden by defining newJobParametersIncrementer
bean in consumer application. It requires a database sequence namedrun_id_sequence
to generate unique run id which can be overridden by settingbatch.run-id-sequence
property inapplication.properties
orapplication.yml
file.
Important
Already running job can not be restarted, as Spring batch does not allow that. Though this behaviour can also be overridden but not recommended.
@ConditionalOnMissingBean
@Bean
JobParametersIncrementer jobParametersIncrementer(
final DataSource dataSource, final BatchProperties batchProperties) {
return new DataFieldMaxValueJobParametersIncrementer(
new PostgresSequenceMaxValueIncrementer(dataSource, batchProperties.getRunIdSequence()));
}
CREATE SEQUENCE IF NOT EXISTS run_id_sequence START WITH 1 INCREMENT BY 1 NO MINVALUE NO MAXVALUE CACHE 1;
BackOffPolicy
to define back off policy for retrying failed steps. Default isExponentialBackOffPolicy
Backoff delay and multiplier can be customized by settingbatch.backoff-initial-delay
andbatch.backoff-multiplier
properties inapplication.properties
orapplication.yml
file. It can be overridden by defining newBackOffPolicy
bean in consumer application.
@ConditionalOnMissingBean
@Bean
BackOffPolicy backOffPolicy(final BatchProperties batchProperties) {
return BackOffPolicyBuilder.newBuilder()
.delay(batchProperties.getBackoffInitialDelay().toMillis())
.multiplier(batchProperties.getBackoffMultiplier())
.build();
}
RetryPolicy
to define retry policy for retrying failed steps. By default, it retries forTransientDataAccessException
andRecoverableDataAccessException
exceptions for JPA and Mongo DB. It works in conjunction withBackOffPolicy
. It can be overridden by defining newRetryPolicy
bean in consumer application and customized by settingbatch.retry-max-attempts
property inapplication.properties
orapplication.yml
file.
@ConditionalOnMissingBean
@Bean
RetryPolicy retryPolicy(final BatchProperties batchProperties) {
CompositeRetryPolicy retryPolicy = new CompositeRetryPolicy();
retryPolicy.setPolicies(
ArrayUtils.toArray(
this.noRetryPolicy(batchProperties), this.daoRetryPolicy(batchProperties)));
return retryPolicy;
}
SkipPolicy
to define skip policy for skipping records in case of exceptions. By default, it skipsConstraintViolationException
andSkipRecordException
. It can be customized by settingbatch.skip-limit
property inapplication.properties
orapplication.yml
file. It can be defined as AlwaysSkipItemSkipPolicy to skip all records in case of any exception. Skipped exceptions must also be specified in noRollback in Step configuration which is handled by this library automatically. It can be overridden by defining newSkipPolicy
bean in consumer application. SimilarlyskippedExceptions
can also be overridden.
@ConditionalOnMissingBean
@Bean
SkipPolicy skipPolicy(final BatchProperties batchProperties) {
Map<Class<? extends Throwable>, Boolean> exceptionClassifiers =
this.skippedExceptions().stream().collect(Collectors.toMap(ex -> ex, ex -> Boolean.TRUE));
return new LimitCheckingItemSkipPolicy(batchProperties.getSkipLimit(), exceptionClassifiers);
}
@ConditionalOnMissingBean
@Bean
List<Class<? extends Throwable>> skippedExceptions() {
return List.of(ConstraintViolationException.class, SkipRecordException.class);
}
JobExecutionListener
default implementation asLoggingJobListener
which does nothing but logging only. It can be overridden by defining newJobExecutionListener
bean in consumer application.
@ConditionalOnMissingBean
@Bean
JobExecutionListener jobExecutionListener() {
return new LoggingJobListener();
}
StepExecutionListener
default implementation asLoggingStepListener
which does nothing but logging only. It can be overridden by defining newStepExecutionListener
bean in consumer application.
@ConditionalOnMissingBean
@Bean
StepExecutionListener stepExecutionListener() {
return new LoggingStepListener();
}
Following are the configuration properties to customize default Spring batch behaviour.
batch:
chunk-size: 100
skip-limit: 10
max-retries: 3
backoff-initial-delay: PT3S
backoff-multiplier: 2
page-size: 300
partition-size: 16
trigger-partitioning-threshold: 100
# task-executor: applicationTaskExecutor
# run-id-sequence: run_id_sequence
batch.chunk-size
: Number of items that are processed in a single transaction by a chunk-oriented step, Default: 100.batch.skip-limit
: Maximum number of items to skip as per configured Skip policy, exceeding which fails the job, Default: 10.batch.max-retries
: Maximum number of retry attempts as configured Retry policy, exceeding which fails the job, Default: 3.batch.backoff-initial-delay
: Time duration (in java.time.Duration format) to wait before the first retry attempt is made after a failure, Default: false.batch.backoff-multiplier
: Factor by which the delay between consecutive retries is multiplied, Default: 3.batch.page-size
: Number of records to be read in each page by Paging Item readers, Default: 100.batch.partition-size
: Number of partitions that will be used to process the data concurrently. Should be optimized as per available machine resources, Default: 8.batch.trigger-partitioning-threshold
: Minimum number of records to trigger partitioning otherwise it could be counter productive to do partitioning, Default: 100.batch.task-executor
: Bean name of the Task Executor to be used for executing the jobs. By defaultSyncTaskExecutor
is used. Set toapplicationTaskExecutor
to useSimpleAsyncTaskExecutor
provided by Spring. Or use any other customTaskExecutor
and set the bean name here. Don't set this property in Spring cloud task but Spring Rest applications.batch.run-id-sequence
: Run Id database sequence name, Default:run_id_sequence
.
Important
To take benefit from Java 21 Virtual threads with Spring boot 3.2 define a VirtualThreadTaskExecutor
and configure the name as batch.task-executor
.
Built on Java 21, Spring boot 3.2.0+ and Spring batch 5.1.0+. For java version 17, build from source by changing the java version as follows.
pom.xml
<properties>
<java.version>17</java.version>
</properties>
Current version: 1.0
Add the spring-batch-commons
jar to application dependencies.
Maven
<dependency>
<groupId>io.github.officiallysingh</groupId>
<artifactId>spring-batch-commons</artifactId>
<version>1.0</version>
</dependency>
Gradle
implementation 'io.github.officiallysingh:spring-batch-commons:1.0'
Define jobs as Beans by extending JobConfigurationSupport
class.
Default configurations can be overridden for a particular Job
by overriding respective methods from JobConfigurationSupport
such as retryPolicy
, skipPolicy
etc.
To override default beans globally, define new bean with same name in consumer application.
Refer to example StatementJobConfiguration
- Define
ItemReader
,ItemProcessor
andItemWriter
beans for each job. - To define a simple job, use
simpleJob
method inJobConfigurationSupport
and return aJob
bean.
@Bean
Job statementJob(
final ItemReader<DailyTransaction> transactionReader,
final ItemProcessor<DailyTransaction, Statement> statementProcessor,
final ItemWriter<Statement> statementWriter) {
return newSimpleJob(
AppConstants.STATEMENT_JOB_NAME,
transactionReader,
statementProcessor,
statementWriter);
}
- To define a partitioned job, use
partitionedJob
method inJobConfigurationSupport
and return aJob
bean.
@Bean
Job statementJob(
@Qualifier("statementJobPartitioner") final AccountsPartitioner statementJobPartitioner,
final ItemReader<DailyTransaction> transactionReader,
final ItemProcessor<DailyTransaction, Statement> statementProcessor,
final ItemWriter<Statement> statementWriter)
throws Exception {
return newPartitionedJob(
AppConstants.STATEMENT_JOB_NAME,
statementJobPartitioner,
transactionReader,
statementProcessor,
statementWriter);
}
- Partitioned jobs also require a partitioner bean to define partitioning strategy.
Define a
Partitioner
bean by extendingAbstractPartitioner
and overridingpartitioningList
method to returnList
of partitioning candidateString
s. Refer to exampleAccountsPartitioner
.
Note
Multiple partitions are created only when total numbers of records returned by partitioningList
method are greater than batch.trigger-partitioning-threshold
property.
Otherwise, all records are processed in a single partition.
- Define a Job executor bean by extending
AbstractJobExecutor
to execute the job. Refer to exampleStatementJobExecutor
. - Define a
SkipListener
bean to handle skipped records. You may want to save skipped records in a separate collection or table and retry later. Refer to exampleStatementJobSkipListener
.
Important
Any component needing access to stepExecutionContext
must be defined as @StepScope
bean
and to access jobParameters
or jobExecutionContext
must be defined as @JobScope
bean
Rajveer Singh, In case you find any issues or need any support, please email me at [email protected]
- Refer to Spring Batch Job implemented as Spring Cloud Task
spring-boot-batch-cloud-task
. - Refer to Spring Batch Job implemented as Spring Rest application
spring-boot-batch-web
. - For exception handling refer to
spring-boot-problem-handler
. - For Spring Data MongoDB Auditing refer to
spring-boot-mongodb-auditing
.