Skip to content

Commit 25e33fb

Browse files
committed
Fix last step execution retrieval in MongoStepExecutionDao
Cherry-picked from commit 845f2c3 and adapted to 5.2.x
1 parent c36df65 commit 25e33fb

File tree

2 files changed

+162
-8
lines changed

2 files changed

+162
-8
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MongoStepExecutionDao.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 the original author or authors.
2+
* Copyright 2024-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,7 +15,6 @@
1515
*/
1616
package org.springframework.batch.core.repository.dao;
1717

18-
import java.util.ArrayList;
1918
import java.util.Collection;
2019
import java.util.Comparator;
2120
import java.util.List;
@@ -100,20 +99,21 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut
10099
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
101100
// TODO optimize the query
102101
// get all step executions
103-
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = new ArrayList<>();
104102
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
105103
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
106104
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
107105
JOB_EXECUTIONS_COLLECTION_NAME);
108-
for (org.springframework.batch.core.repository.persistence.JobExecution jobExecution : jobExecutions) {
109-
stepExecutions.addAll(jobExecution.getStepExecutions());
110-
}
106+
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = this.mongoOperations
107+
.find(query(where("jobExecutionId").in(jobExecutions.stream()
108+
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
109+
.toList())), org.springframework.batch.core.repository.persistence.StepExecution.class,
110+
STEP_EXECUTIONS_COLLECTION_NAME);
111111
// sort step executions by creation date then id (see contract) and return the
112-
// first one
112+
// last one
113113
Optional<org.springframework.batch.core.repository.persistence.StepExecution> lastStepExecution = stepExecutions
114114
.stream()
115115
.filter(stepExecution -> stepExecution.getName().equals(stepName))
116-
.min(Comparator
116+
.max(Comparator
117117
.comparing(org.springframework.batch.core.repository.persistence.StepExecution::getCreateTime)
118118
.thenComparing(org.springframework.batch.core.repository.persistence.StepExecution::getId));
119119
if (lastStepExecution.isPresent()) {
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.core.repository.support;
17+
18+
import java.util.Map;
19+
20+
import com.mongodb.client.MongoCollection;
21+
import org.bson.Document;
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.Test;
25+
import org.testcontainers.containers.MongoDBContainer;
26+
import org.testcontainers.junit.jupiter.Container;
27+
import org.testcontainers.junit.jupiter.Testcontainers;
28+
import org.testcontainers.utility.DockerImageName;
29+
30+
import org.springframework.batch.core.*;
31+
import org.springframework.batch.core.job.builder.JobBuilder;
32+
import org.springframework.batch.core.launch.JobLauncher;
33+
import org.springframework.batch.core.repository.JobRepository;
34+
import org.springframework.batch.core.step.builder.StepBuilder;
35+
import org.springframework.batch.repeat.RepeatStatus;
36+
import org.springframework.beans.factory.annotation.Autowired;
37+
import org.springframework.data.domain.Sort;
38+
import org.springframework.data.mongodb.MongoTransactionManager;
39+
import org.springframework.data.mongodb.core.MongoTemplate;
40+
import org.springframework.data.mongodb.core.index.Index;
41+
import org.springframework.test.context.DynamicPropertyRegistry;
42+
import org.springframework.test.context.DynamicPropertySource;
43+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
44+
45+
/**
46+
* @author Mahmoud Ben Hassine
47+
*/
48+
@Testcontainers(disabledWithoutDocker = true)
49+
@SpringJUnitConfig(MongoDBIntegrationTestConfiguration.class)
50+
public class MongoDBJobRestartIntegrationTests {
51+
52+
private static final DockerImageName MONGODB_IMAGE = DockerImageName.parse("mongo:8.0.1");
53+
54+
@Container
55+
public static MongoDBContainer mongodb = new MongoDBContainer(MONGODB_IMAGE);
56+
57+
@DynamicPropertySource
58+
static void setMongoDbConnectionString(DynamicPropertyRegistry registry) {
59+
registry.add("mongo.connectionString", mongodb::getConnectionString);
60+
}
61+
62+
@Autowired
63+
private MongoTemplate mongoTemplate;
64+
65+
@Autowired
66+
JobLauncher jobLauncher;
67+
68+
@Autowired
69+
JobRepository jobRepository;
70+
71+
@Autowired
72+
MongoTransactionManager transactionManager;
73+
74+
@BeforeEach
75+
public void setUp() {
76+
// collections
77+
mongoTemplate.createCollection("BATCH_JOB_INSTANCE");
78+
mongoTemplate.createCollection("BATCH_JOB_EXECUTION");
79+
mongoTemplate.createCollection("BATCH_STEP_EXECUTION");
80+
// sequences
81+
mongoTemplate.createCollection("BATCH_SEQUENCES");
82+
mongoTemplate.getCollection("BATCH_SEQUENCES")
83+
.insertOne(new Document(Map.of("_id", "BATCH_JOB_INSTANCE_SEQ", "count", 0L)));
84+
mongoTemplate.getCollection("BATCH_SEQUENCES")
85+
.insertOne(new Document(Map.of("_id", "BATCH_JOB_EXECUTION_SEQ", "count", 0L)));
86+
mongoTemplate.getCollection("BATCH_SEQUENCES")
87+
.insertOne(new Document(Map.of("_id", "BATCH_STEP_EXECUTION_SEQ", "count", 0L)));
88+
// indices
89+
mongoTemplate.indexOps("BATCH_JOB_INSTANCE")
90+
.ensureIndex(new Index().on("jobName", Sort.Direction.ASC).named("job_name_idx"));
91+
mongoTemplate.indexOps("BATCH_JOB_INSTANCE")
92+
.ensureIndex(new Index().on("jobName", Sort.Direction.ASC)
93+
.on("jobKey", Sort.Direction.ASC)
94+
.named("job_name_key_idx"));
95+
mongoTemplate.indexOps("BATCH_JOB_INSTANCE")
96+
.ensureIndex(new Index().on("jobInstanceId", Sort.Direction.DESC).named("job_instance_idx"));
97+
mongoTemplate.indexOps("BATCH_JOB_EXECUTION")
98+
.ensureIndex(new Index().on("jobInstanceId", Sort.Direction.ASC).named("job_instance_idx"));
99+
mongoTemplate.indexOps("BATCH_JOB_EXECUTION")
100+
.ensureIndex(new Index().on("jobInstanceId", Sort.Direction.ASC)
101+
.on("status", Sort.Direction.ASC)
102+
.named("job_instance_status_idx"));
103+
mongoTemplate.indexOps("BATCH_STEP_EXECUTION")
104+
.ensureIndex(new Index().on("stepExecutionId", Sort.Direction.ASC).named("step_execution_idx"));
105+
}
106+
107+
@Test
108+
void testJobExecutionRestart() throws Exception {
109+
// given
110+
Job job = new JobBuilder("job", jobRepository)
111+
.start(new StepBuilder("step", jobRepository).tasklet((contribution, chunkContext) -> {
112+
boolean shouldFail = (boolean) chunkContext.getStepContext().getJobParameters().get("shouldfail");
113+
if (shouldFail) {
114+
throw new RuntimeException("Step failed");
115+
}
116+
return RepeatStatus.FINISHED;
117+
}, transactionManager).build())
118+
.build();
119+
JobParameters jobParameters = new JobParametersBuilder().addString("name", "foo")
120+
// shouldfail is non identifying => no effect on job instance identity
121+
.addJobParameter("shouldfail", true, Boolean.class, false)
122+
.toJobParameters();
123+
124+
// First run - expected to fail
125+
JobExecution jobExecution1 = jobLauncher.run(job, jobParameters);
126+
Assertions.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution1.getExitStatus().getExitCode());
127+
128+
// Second run - expected to fail again
129+
JobExecution jobExecution2 = jobLauncher.run(job, jobParameters);
130+
Assertions.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution2.getExitStatus().getExitCode());
131+
132+
// Third run - expected to succeed
133+
jobParameters = new JobParametersBuilder().addString("name", "foo")
134+
.addJobParameter("shouldfail", false, Boolean.class, false)
135+
.toJobParameters();
136+
JobExecution jobExecution3 = jobLauncher.run(job, jobParameters);
137+
Assertions.assertEquals(ExitStatus.COMPLETED, jobExecution3.getExitStatus());
138+
139+
MongoCollection<Document> jobInstancesCollection = mongoTemplate.getCollection("BATCH_JOB_INSTANCE");
140+
MongoCollection<Document> jobExecutionsCollection = mongoTemplate.getCollection("BATCH_JOB_EXECUTION");
141+
MongoCollection<Document> stepExecutionsCollection = mongoTemplate.getCollection("BATCH_STEP_EXECUTION");
142+
143+
Assertions.assertEquals(1, jobInstancesCollection.countDocuments());
144+
Assertions.assertEquals(3, jobExecutionsCollection.countDocuments());
145+
Assertions.assertEquals(3, stepExecutionsCollection.countDocuments());
146+
147+
JobInstance jobInstance = jobRepository.getJobInstance("job", jobParameters);
148+
Assertions.assertNotNull(jobInstance);
149+
StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, "step");
150+
Assertions.assertNotNull(lastStepExecution);
151+
Assertions.assertEquals(3, lastStepExecution.getId());
152+
}
153+
154+
}

0 commit comments

Comments
 (0)