diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java index 2c86d54e21..f4f79e75df 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java @@ -51,6 +51,9 @@ import org.springframework.batch.core.step.StepInterruptionPolicy; import org.springframework.batch.core.step.ThreadStepInterruptionPolicy; import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy; +import org.springframework.batch.core.step.skip.NonSkippableProcessException; +import org.springframework.batch.core.step.skip.NonSkippableReadException; +import org.springframework.batch.core.step.skip.NonSkippableWriteException; import org.springframework.batch.core.step.skip.SkipPolicy; import org.springframework.batch.infrastructure.item.Chunk; import org.springframework.batch.infrastructure.item.ExecutionContext; @@ -560,6 +563,9 @@ private void doSkipInRead(RetryException retryException, StepContribution contri this.compositeSkipListener.onSkipInRead(cause); contribution.incrementReadSkipCount(); } + else { + throw new NonSkippableReadException("Skip policy rejected skipping item", cause); + } } private Chunk processChunk(Chunk chunk, StepContribution contribution) throws Exception { @@ -653,6 +659,9 @@ private void doSkipInProcess(I item, RetryException retryException, StepContribu this.compositeSkipListener.onSkipInProcess(item, retryException.getCause()); contribution.incrementProcessSkipCount(); } + else { + throw new NonSkippableProcessException("Skip policy rejected skipping item", cause); + } } private void writeChunk(Chunk chunk, StepContribution contribution) throws Exception { @@ -737,6 +746,7 @@ private void scan(Chunk chunk, StepContribution contribution) { else { logger.error("Failed to write item: " + item, exception); this.compositeItemWriteListener.onWriteError(exception, singleItemChunk); + throw new NonSkippableWriteException("Skip policy rejected skipping item", exception); } } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepTests.java index 244719d754..681c993e21 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepTests.java @@ -17,21 +17,32 @@ import org.junit.jupiter.api.Test; +import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.job.JobExecution; import org.springframework.batch.core.job.JobInstance; import org.springframework.batch.core.job.parameters.JobParameters; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.ResourcelessJobRepository; +import org.springframework.batch.core.step.FatalStepExecutionException; import org.springframework.batch.core.step.StepExecution; +import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; +import org.springframework.batch.core.step.skip.NonSkippableProcessException; +import org.springframework.batch.infrastructure.item.ItemProcessor; import org.springframework.batch.infrastructure.item.ItemReader; import org.springframework.batch.infrastructure.item.ItemWriter; +import org.springframework.core.retry.policy.SimpleRetryPolicy; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** + * Tests for {@link ChunkOrientedStep}. + * * @author Mahmoud Ben Hassine */ public class ChunkOrientedStepTests { @@ -57,4 +68,46 @@ void testReadNoMoreThanAvailableItems() throws Exception { verify(reader, times(6)).read(); } + @Test + void testDoSkipInProcessThrowsNonSkippableProcessExceptionWhenSkipPolicyReturnsFalse() throws Exception { + // given - fault-tolerant step with NeverSkipItemSkipPolicy and retry limit + ItemReader reader = mock(); + when(reader.read()).thenReturn("item1", "item2", "item3", null); + + ItemProcessor processor = item -> { + if ("item2".equals(item)) { + throw new RuntimeException("Processing failed for item2"); + } + return item.toUpperCase(); + }; + + ItemWriter writer = chunk -> { + }; + + JobRepository jobRepository = new ResourcelessJobRepository(); + ChunkOrientedStep step = new ChunkOrientedStep<>("step", 3, reader, writer, jobRepository); + step.setItemProcessor(processor); + step.setFaultTolerant(true); + step.setRetryPolicy(new SimpleRetryPolicy(2)); // retry once (initial + 1 retry) + step.setSkipPolicy(new NeverSkipItemSkipPolicy()); // never skip + step.afterPropertiesSet(); + + JobInstance jobInstance = new JobInstance(1L, "job"); + JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters()); + StepExecution stepExecution = new StepExecution(1L, "step", jobExecution); + + // when - execute step + FatalStepExecutionException exception = assertThrows(FatalStepExecutionException.class, () -> { + step.execute(stepExecution); + }); + + // then - should throw NonSkippableProcessException + Throwable cause = exception.getCause(); + assertInstanceOf(NonSkippableProcessException.class, cause, + "Expected NonSkippableProcessException when skip policy rejects skipping"); + assertEquals("Skip policy rejected skipping item", cause.getMessage()); + assertEquals(ExitStatus.FAILED.getExitCode(), stepExecution.getExitStatus().getExitCode()); + assertEquals(0, stepExecution.getProcessSkipCount(), "Process skip count should be 0"); + } + }