Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
import org.springframework.batch.core.step.StepInterruptionPolicy;
import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.batch.core.step.skip.NonSkippableProcessException;
import org.springframework.batch.core.step.skip.NonSkippableReadException;
import org.springframework.batch.core.step.skip.NonSkippableWriteException;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.infrastructure.item.Chunk;
import org.springframework.batch.infrastructure.item.ExecutionContext;
Expand Down Expand Up @@ -560,6 +563,9 @@ private void doSkipInRead(RetryException retryException, StepContribution contri
this.compositeSkipListener.onSkipInRead(cause);
contribution.incrementReadSkipCount();
}
else {
throw new NonSkippableReadException("Skip policy rejected skipping item", cause);
}
}

private Chunk<O> processChunk(Chunk<I> chunk, StepContribution contribution) throws Exception {
Expand Down Expand Up @@ -653,6 +659,9 @@ private void doSkipInProcess(I item, RetryException retryException, StepContribu
this.compositeSkipListener.onSkipInProcess(item, retryException.getCause());
contribution.incrementProcessSkipCount();
}
else {
throw new NonSkippableProcessException("Skip policy rejected skipping item", cause);
}
}

private void writeChunk(Chunk<O> chunk, StepContribution contribution) throws Exception {
Expand Down Expand Up @@ -737,6 +746,7 @@ private void scan(Chunk<O> chunk, StepContribution contribution) {
else {
logger.error("Failed to write item: " + item, exception);
this.compositeItemWriteListener.onWriteError(exception, singleItemChunk);
throw new NonSkippableWriteException("Skip policy rejected skipping item", exception);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,32 @@

import org.junit.jupiter.api.Test;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.job.JobExecution;
import org.springframework.batch.core.job.JobInstance;
import org.springframework.batch.core.job.parameters.JobParameters;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.ResourcelessJobRepository;
import org.springframework.batch.core.step.FatalStepExecutionException;
import org.springframework.batch.core.step.StepExecution;
import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
import org.springframework.batch.core.step.skip.NonSkippableProcessException;
import org.springframework.batch.infrastructure.item.ItemProcessor;
import org.springframework.batch.infrastructure.item.ItemReader;
import org.springframework.batch.infrastructure.item.ItemWriter;
import org.springframework.core.retry.policy.SimpleRetryPolicy;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Tests for {@link ChunkOrientedStep}.
*
* @author Mahmoud Ben Hassine
*/
public class ChunkOrientedStepTests {
Expand All @@ -57,4 +68,46 @@ void testReadNoMoreThanAvailableItems() throws Exception {
verify(reader, times(6)).read();
}

@Test
void testDoSkipInProcessThrowsNonSkippableProcessExceptionWhenSkipPolicyReturnsFalse() throws Exception {
// given - fault-tolerant step with NeverSkipItemSkipPolicy and retry limit
ItemReader<String> reader = mock();
when(reader.read()).thenReturn("item1", "item2", "item3", null);

ItemProcessor<String, String> processor = item -> {
if ("item2".equals(item)) {
throw new RuntimeException("Processing failed for item2");
}
return item.toUpperCase();
};

ItemWriter<String> writer = chunk -> {
};

JobRepository jobRepository = new ResourcelessJobRepository();
ChunkOrientedStep<String, String> step = new ChunkOrientedStep<>("step", 3, reader, writer, jobRepository);
step.setItemProcessor(processor);
step.setFaultTolerant(true);
step.setRetryPolicy(new SimpleRetryPolicy(2)); // retry once (initial + 1 retry)
step.setSkipPolicy(new NeverSkipItemSkipPolicy()); // never skip
step.afterPropertiesSet();

JobInstance jobInstance = new JobInstance(1L, "job");
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
StepExecution stepExecution = new StepExecution(1L, "step", jobExecution);

// when - execute step
FatalStepExecutionException exception = assertThrows(FatalStepExecutionException.class, () -> {
step.execute(stepExecution);
});

// then - should throw NonSkippableProcessException
Throwable cause = exception.getCause();
assertInstanceOf(NonSkippableProcessException.class, cause,
"Expected NonSkippableProcessException when skip policy rejects skipping");
assertEquals("Skip policy rejected skipping item", cause.getMessage());
assertEquals(ExitStatus.FAILED.getExitCode(), stepExecution.getExitStatus().getExitCode());
assertEquals(0, stepExecution.getProcessSkipCount(), "Process skip count should be 0");
}

}