Skip to content

Commit 95e1896

Browse files
committed
Add concurrency support in ChunkOrientedStep
Resolves #4955
1 parent 0501a87 commit 95e1896

File tree

3 files changed

+371
-69
lines changed

3 files changed

+371
-69
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/ChunkOrientedStepBuilder.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.springframework.batch.support.ReflectionUtils;
5353
import org.springframework.core.retry.RetryListener;
5454
import org.springframework.core.retry.RetryPolicy;
55+
import org.springframework.core.task.AsyncTaskExecutor;
5556
import org.springframework.transaction.PlatformTransactionManager;
5657
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
5758
import org.springframework.transaction.interceptor.TransactionAttribute;
@@ -93,6 +94,8 @@ public class ChunkOrientedStepBuilder<I, O> extends StepBuilderHelper<ChunkOrien
9394

9495
private final Set<SkipListener<I, O>> skipListeners = new LinkedHashSet<>();
9596

97+
private AsyncTaskExecutor asyncTaskExecutor;
98+
9699
/**
97100
* Create a new {@link ChunkOrientedStepBuilder} with the given job repository and
98101
* transaction manager. The step name will be assigned to the bean name.
@@ -296,6 +299,18 @@ public ChunkOrientedStepBuilder<I, O> skipListener(SkipListener<I, O> skipListen
296299
return self();
297300
}
298301

302+
/**
303+
* Set the asynchronous task executor to be used for processing items concurrently.
304+
* This allows for concurrent processing of items, improving performance and
305+
* throughput. If not set, the step will process items sequentially.
306+
* @param asyncTaskExecutor the asynchronous task executor to use
307+
* @return this for fluent chaining
308+
*/
309+
public ChunkOrientedStepBuilder<I, O> taskExecutor(AsyncTaskExecutor asyncTaskExecutor) {
310+
this.asyncTaskExecutor = asyncTaskExecutor;
311+
return self();
312+
}
313+
299314
@SuppressWarnings("unchecked")
300315
public ChunkOrientedStep<I, O> build() {
301316
ChunkOrientedStep<I, O> chunkOrientedStep = new ChunkOrientedStep<>(this.getName(), this.chunkSize, this.reader,
@@ -306,6 +321,9 @@ public ChunkOrientedStep<I, O> build() {
306321
chunkOrientedStep.setRetryPolicy(this.retryPolicy);
307322
chunkOrientedStep.setSkipPolicy(this.skipPolicy);
308323
chunkOrientedStep.setFaultTolerant(this.faultTolerant);
324+
if (this.asyncTaskExecutor != null) {
325+
chunkOrientedStep.setTaskExecutor(this.asyncTaskExecutor);
326+
}
309327
streams.forEach(chunkOrientedStep::registerItemStream);
310328
stepListeners.forEach(stepListener -> {
311329
if (stepListener instanceof ItemReadListener) {

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java

Lines changed: 178 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
*/
1616
package org.springframework.batch.core.step.item;
1717

18+
import java.util.LinkedList;
19+
import java.util.List;
20+
import java.util.concurrent.Future;
21+
1822
import org.apache.commons.logging.Log;
1923
import org.apache.commons.logging.LogFactory;
2024
import org.jspecify.annotations.Nullable;
@@ -30,6 +34,8 @@
3034
import org.springframework.batch.core.listener.ItemReadListener;
3135
import org.springframework.batch.core.listener.ItemWriteListener;
3236
import org.springframework.batch.core.listener.SkipListener;
37+
import org.springframework.batch.core.scope.context.StepContext;
38+
import org.springframework.batch.core.scope.context.StepSynchronizationManager;
3339
import org.springframework.batch.core.step.StepContribution;
3440
import org.springframework.batch.core.step.StepExecution;
3541
import org.springframework.batch.core.repository.JobRepository;
@@ -52,6 +58,7 @@
5258
import org.springframework.core.retry.RetryTemplate;
5359
import org.springframework.core.retry.Retryable;
5460
import org.springframework.core.retry.support.CompositeRetryListener;
61+
import org.springframework.core.task.AsyncTaskExecutor;
5562
import org.springframework.transaction.PlatformTransactionManager;
5663
import org.springframework.transaction.TransactionStatus;
5764
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
@@ -61,11 +68,13 @@
6168
import org.springframework.util.Assert;
6269

6370
/**
64-
* Step implementation for the chunk-oriented processing model.
71+
* Step implementation for the chunk-oriented processing model. This class also supports
72+
* faut-tolerance features (retry and skip) as well as concurrent item processing when a
73+
* {@link AsyncTaskExecutor} is provided.
6574
*
66-
* @author Mahmoud Ben Hassine
6775
* @param <I> type of input items
6876
* @param <O> type of output items
77+
* @author Mahmoud Ben Hassine
6978
* @since 6.0
7079
*/
7180
public class ChunkOrientedStep<I, O> extends AbstractStep {
@@ -128,6 +137,11 @@ public class ChunkOrientedStep<I, O> extends AbstractStep {
128137

129138
private final CompositeSkipListener<I, O> compositeSkipListener = new CompositeSkipListener<>();
130139

140+
/*
141+
* Concurrency parameters
142+
*/
143+
private AsyncTaskExecutor taskExecutor;
144+
131145
/**
132146
* Create a new {@link ChunkOrientedStep}.
133147
* @param name the name of the step
@@ -232,6 +246,15 @@ public void setFaultTolerant(boolean faultTolerant) {
232246
this.faultTolerant = faultTolerant;
233247
}
234248

249+
/**
250+
* Set the {@link AsyncTaskExecutor} to use for processing items asynchronously.
251+
* @param asyncTaskExecutor the asynchronous task executor to set
252+
*/
253+
public void setTaskExecutor(AsyncTaskExecutor asyncTaskExecutor) {
254+
Assert.notNull(asyncTaskExecutor, "Task executor must not be null");
255+
this.taskExecutor = asyncTaskExecutor;
256+
}
257+
235258
/**
236259
* Set the {@link RetryPolicy} for this step.
237260
* @param retryPolicy the retry policy to set
@@ -314,35 +337,93 @@ protected void doExecute(StepExecution stepExecution) throws Exception {
314337
@Override
315338
protected void doInTransactionWithoutResult(TransactionStatus status) {
316339
StepContribution contribution = stepExecution.createStepContribution();
317-
Chunk<I> inputChunk = new Chunk<>();
318-
Chunk<O> processedChunk = new Chunk<>();
319-
try {
320-
inputChunk = read(contribution);
321-
if (inputChunk.isEmpty()) {
322-
return;
323-
}
324-
compositeChunkListener.beforeChunk(inputChunk);
325-
processedChunk = process(inputChunk, contribution);
326-
write(processedChunk, contribution);
327-
compositeChunkListener.afterChunk(processedChunk);
328-
stepExecution.apply(contribution);
329-
stepExecution.incrementCommitCount();
330-
compositeItemStream.update(stepExecution.getExecutionContext());
331-
getJobRepository().update(stepExecution);
332-
getJobRepository().updateExecutionContext(stepExecution);
340+
if (isConcurrent()) {
341+
processChunkConcurrently(status, contribution, stepExecution);
333342
}
334-
catch (Exception e) {
335-
logger.error("Rolling back chunk transaction", e);
336-
status.setRollbackOnly();
337-
stepExecution.incrementRollbackCount();
338-
compositeChunkListener.onChunkError(e, processedChunk);
339-
throw new FatalStepExecutionException("Unable to process chunk", e);
343+
else {
344+
processChunkSequentially(status, contribution, stepExecution);
340345
}
341346
}
342347
});
343348
}
344349
}
345350

351+
private void processChunkConcurrently(TransactionStatus status, StepContribution contribution,
352+
StepExecution stepExecution) {
353+
List<Future<O>> itemProcessingTasks = new LinkedList<>();
354+
try {
355+
// read items and submit concurrent item processing tasks
356+
for (int i = 0; i < this.chunkSize; i++) {
357+
I item = readItem(contribution);
358+
if (item != null) {
359+
Future<O> itemProcessingFuture = this.taskExecutor.submit(() -> processItem(item, contribution));
360+
itemProcessingTasks.add(itemProcessingFuture);
361+
}
362+
}
363+
// exclude empty chunks (when the total items is a multiple of the chunk size)
364+
if (itemProcessingTasks.isEmpty()) {
365+
return;
366+
}
367+
368+
// collect processed items
369+
Chunk<O> processedChunk = new Chunk<>();
370+
for (Future<O> future : itemProcessingTasks) {
371+
O processedItem = future.get();
372+
if (processedItem != null) {
373+
processedChunk.add(processedItem);
374+
}
375+
}
376+
377+
// write processed items
378+
writeChunk(processedChunk, contribution);
379+
380+
// apply contribution and update job repository
381+
stepExecution.apply(contribution);
382+
stepExecution.incrementCommitCount();
383+
this.compositeItemStream.update(stepExecution.getExecutionContext());
384+
getJobRepository().update(stepExecution);
385+
getJobRepository().updateExecutionContext(stepExecution);
386+
387+
}
388+
catch (Exception e) {
389+
logger.error("Rolling back chunk transaction", e);
390+
status.setRollbackOnly();
391+
stepExecution.incrementRollbackCount();
392+
throw new FatalStepExecutionException("Unable to process chunk", e);
393+
}
394+
395+
}
396+
397+
private void processChunkSequentially(TransactionStatus status, StepContribution contribution,
398+
StepExecution stepExecution) {
399+
Chunk<I> inputChunk = new Chunk<>();
400+
Chunk<O> processedChunk = new Chunk<>();
401+
try {
402+
inputChunk = readChunk(contribution);
403+
if (inputChunk.isEmpty()) {
404+
return;
405+
}
406+
compositeChunkListener.beforeChunk(inputChunk);
407+
processedChunk = processChunk(inputChunk, contribution);
408+
writeChunk(processedChunk, contribution);
409+
compositeChunkListener.afterChunk(processedChunk);
410+
411+
// apply contribution and update job repository
412+
stepExecution.apply(contribution);
413+
stepExecution.incrementCommitCount();
414+
compositeItemStream.update(stepExecution.getExecutionContext());
415+
getJobRepository().update(stepExecution);
416+
getJobRepository().updateExecutionContext(stepExecution);
417+
}
418+
catch (Exception e) {
419+
logger.error("Rolling back chunk transaction", e);
420+
status.setRollbackOnly();
421+
stepExecution.incrementRollbackCount();
422+
compositeChunkListener.onChunkError(e, processedChunk);
423+
throw new FatalStepExecutionException("Unable to process chunk", e);
424+
}
425+
}
426+
346427
/*
347428
* Check if the step has been interrupted either internally via user defined policy or
348429
* externally via job operator. This will be checked at chunk boundaries.
@@ -362,36 +443,42 @@ private boolean interrupted(StepExecution stepExecution) {
362443
return false;
363444
}
364445

365-
private Chunk<I> read(StepContribution contribution) throws Exception {
446+
private Chunk<I> readChunk(StepContribution contribution) throws Exception {
366447
Chunk<I> chunk = new Chunk<>();
367448
for (int i = 0; i < chunkSize; i++) {
368-
this.compositeItemReadListener.beforeRead();
369-
try {
370-
I item = doRead();
371-
if (item == null) {
372-
chunkTracker.noMoreItems();
373-
break;
374-
}
375-
else {
376-
chunk.add(item);
377-
contribution.incrementReadCount();
378-
this.compositeItemReadListener.afterRead(item);
379-
}
380-
}
381-
catch (Exception exception) {
382-
this.compositeItemReadListener.onReadError(exception);
383-
if (this.faultTolerant && exception instanceof RetryException retryException) {
384-
doSkipInRead(retryException, contribution);
385-
}
386-
else {
387-
throw exception;
388-
}
449+
I item = readItem(contribution);
450+
if (item != null) {
451+
chunk.add(item);
389452
}
390-
391453
}
392454
return chunk;
393455
}
394456

457+
@Nullable private I readItem(StepContribution contribution) throws Exception {
458+
this.compositeItemReadListener.beforeRead();
459+
I item = null;
460+
try {
461+
item = doRead();
462+
if (item == null) {
463+
this.chunkTracker.noMoreItems();
464+
}
465+
else {
466+
contribution.incrementReadCount();
467+
this.compositeItemReadListener.afterRead(item);
468+
}
469+
}
470+
catch (Exception exception) {
471+
this.compositeItemReadListener.onReadError(exception);
472+
if (this.faultTolerant && exception instanceof RetryException retryException) {
473+
doSkipInRead(retryException, contribution);
474+
}
475+
else {
476+
throw exception;
477+
}
478+
}
479+
return item;
480+
}
481+
395482
@Nullable private I doRead() throws Exception {
396483
if (this.faultTolerant) {
397484
Retryable<I> retryableRead = new Retryable<>() {
@@ -420,39 +507,57 @@ private void doSkipInRead(RetryException retryException, StepContribution contri
420507
}
421508
}
422509

423-
private Chunk<O> process(Chunk<I> chunk, StepContribution contribution) throws Exception {
510+
private Chunk<O> processChunk(Chunk<I> chunk, StepContribution contribution) throws Exception {
424511
Chunk<O> processedChunk = new Chunk<>();
425512
for (I item : chunk) {
426-
try {
427-
this.compositeItemProcessListener.beforeProcess(item);
428-
O processedItem = doProcess(item);
429-
if (processedItem == null) {
430-
contribution.incrementFilterCount();
431-
}
432-
else {
433-
processedChunk.add(processedItem);
434-
}
435-
this.compositeItemProcessListener.afterProcess(item, processedItem);
436-
}
437-
catch (Exception exception) {
438-
this.compositeItemProcessListener.onProcessError(item, exception);
439-
if (this.faultTolerant && exception instanceof RetryException retryException) {
440-
doSkipInProcess(item, retryException, contribution);
441-
}
442-
else {
443-
throw exception;
444-
}
513+
O processedItem = processItem(item, contribution);
514+
if (processedItem != null) {
515+
processedChunk.add(processedItem);
445516
}
446517
}
447518
return processedChunk;
448519
}
449520

521+
private O processItem(I item, StepContribution contribution) throws Exception {
522+
O processedItem = null;
523+
try {
524+
this.compositeItemProcessListener.beforeProcess(item);
525+
processedItem = doProcess(item);
526+
if (processedItem == null) {
527+
contribution.incrementFilterCount();
528+
}
529+
this.compositeItemProcessListener.afterProcess(item, processedItem);
530+
}
531+
catch (Exception exception) {
532+
this.compositeItemProcessListener.onProcessError(item, exception);
533+
if (this.faultTolerant && exception instanceof RetryException retryException) {
534+
doSkipInProcess(item, retryException, contribution);
535+
}
536+
else {
537+
throw exception;
538+
}
539+
}
540+
return processedItem;
541+
}
542+
450543
@Nullable private O doProcess(I item) throws Exception {
451544
if (this.faultTolerant) {
452545
Retryable<O> retryableProcess = new Retryable<>() {
453546
@Override
454547
public @Nullable O execute() throws Throwable {
455-
return itemProcessor.process(item);
548+
StepContext context = StepSynchronizationManager.getContext();
549+
final StepExecution stepExecution = context == null ? null : context.getStepExecution();
550+
if (isConcurrent() && stepExecution != null) {
551+
StepSynchronizationManager.register(stepExecution);
552+
}
553+
try {
554+
return itemProcessor.process(item);
555+
}
556+
finally {
557+
if (isConcurrent() && stepExecution != null) {
558+
StepSynchronizationManager.close();
559+
}
560+
}
456561
}
457562

458563
@Override
@@ -475,7 +580,7 @@ private void doSkipInProcess(I item, RetryException retryException, StepContribu
475580
}
476581
}
477582

478-
private void write(Chunk<O> chunk, StepContribution contribution) throws Exception {
583+
private void writeChunk(Chunk<O> chunk, StepContribution contribution) throws Exception {
479584
try {
480585
this.compositeItemWriteListener.beforeWrite(chunk);
481586
doWrite(chunk);
@@ -538,6 +643,10 @@ private void scan(Chunk<O> chunk, StepContribution contribution) {
538643
}
539644
}
540645

646+
private boolean isConcurrent() {
647+
return this.taskExecutor != null;
648+
}
649+
541650
private static class ChunkTracker {
542651

543652
private boolean moreItems = true;

0 commit comments

Comments
 (0)