Skip to content

Commit 4cab990

Browse files
committed
#747 | /api/subjectMigration/bulk failures to s3
1 parent 64e58e4 commit 4cab990

File tree

6 files changed

+64
-36
lines changed

6 files changed

+64
-36
lines changed

avni-server-api/src/main/java/org/avni/server/importer/batch/sync/attributes/bulkmigration/BulkSubjectMigrationJobListener.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package org.avni.server.importer.batch.sync.attributes.bulkmigration;
22

3+
import com.fasterxml.jackson.databind.ObjectMapper;
34
import org.avni.server.framework.security.AuthService;
4-
import org.avni.server.web.request.SubjectMigrationRequest;
5+
import org.avni.server.service.BulkUploadS3Service;
6+
import org.avni.server.util.ObjectMapperSingleton;
7+
import org.avni.server.web.request.BulkSubjectMigrationRequest;
58
import org.slf4j.Logger;
69
import org.slf4j.LoggerFactory;
710
import org.springframework.batch.core.JobExecution;
@@ -11,11 +14,18 @@
1114
import org.springframework.beans.factory.annotation.Value;
1215
import org.springframework.stereotype.Component;
1316

17+
import java.io.File;
18+
import java.io.IOException;
19+
import java.util.Map;
20+
21+
import static java.lang.String.format;
22+
1423
@Component
1524
@JobScope
1625
public class BulkSubjectMigrationJobListener extends JobExecutionListenerSupport {
1726
private static final Logger logger = LoggerFactory.getLogger(BulkSubjectMigrationJobListener.class);
1827
private final AuthService authService;
28+
private final BulkUploadS3Service s3Service;
1929

2030
@Value("#{jobParameters['uuid']}")
2131
private String uuid;
@@ -29,12 +39,16 @@ public class BulkSubjectMigrationJobListener extends JobExecutionListenerSupport
2939
@Value("#{jobParameters['userId']}")
3040
private Long userId;
3141

42+
@Value("#{jobParameters['fileName']}")
43+
private String fileName;
44+
3245
@Value("#{jobParameters['bulkSubjectMigrationParameters']}")
33-
private SubjectMigrationRequest bulkSubjectMigrationParameters;
46+
private BulkSubjectMigrationRequest bulkSubjectMigrationParameters;
3447

3548
@Autowired
36-
public BulkSubjectMigrationJobListener(AuthService authService) {
49+
public BulkSubjectMigrationJobListener(AuthService authService, BulkUploadS3Service s3Service) {
3750
this.authService = authService;
51+
this.s3Service = s3Service;
3852
}
3953

4054
@Override
@@ -45,7 +59,21 @@ public void beforeJob(JobExecution jobExecution) {
4559

4660
@Override
4761
public void afterJob(JobExecution jobExecution) {
48-
logger.info("Finished Bulk Subject Migration Job {} mode: {} exitStatus: {} createTime: {} startTime: {} endTime: {}",
49-
uuid, mode, jobExecution.getExitStatus(), jobExecution.getCreateTime(), jobExecution.getStartTime(), jobExecution.getEndTime());
62+
Map<String, String> failedMigrations = (Map<String, String>) jobExecution.getExecutionContext().get("failedMigrations");
63+
logger.info("Finished Bulk Subject Migration Job {} mode: {} failedCount: {} exitStatus: {} waitTime: {}ms processingTime: {}ms fileName: {}",
64+
uuid, mode, failedMigrations.size(), jobExecution.getExitStatus(), jobExecution.getStartTime().getTime() - jobExecution.getCreateTime().getTime(), jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime(), fileName);
65+
try {
66+
writeFailuresToFileAndUploadToS3(failedMigrations);
67+
} catch (Exception e) {
68+
logger.error("Failed to write bulk subject migration failures to file and upload", e);
69+
}
70+
71+
}
72+
73+
private void writeFailuresToFileAndUploadToS3(Map<String, String> failedMigrations) throws IOException {
74+
ObjectMapper objectMapper = ObjectMapperSingleton.getObjectMapper();
75+
File failedMigrationsFile = new File(format("%s/%s", System.getProperty("java.io.tmpdir"), fileName));
76+
objectMapper.writerWithDefaultPrettyPrinter().writeValue(failedMigrationsFile, failedMigrations);
77+
s3Service.uploadFile(failedMigrationsFile, fileName, "bulksubjectmigrations");
5078
}
5179
}
Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package org.avni.server.importer.batch.sync.attributes.bulkmigration;
22

3-
import com.fasterxml.jackson.databind.ObjectMapper;
4-
import org.avni.server.service.S3Service;
3+
import org.avni.server.service.BulkUploadS3Service;
54
import org.avni.server.service.SubjectMigrationService;
6-
import org.avni.server.util.ObjectMapperSingleton;
7-
import org.avni.server.web.request.SubjectMigrationRequest;
5+
import org.avni.server.web.request.BulkSubjectMigrationRequest;
86
import org.slf4j.Logger;
97
import org.slf4j.LoggerFactory;
108
import org.springframework.batch.core.StepContribution;
@@ -16,39 +14,32 @@
1614
import org.springframework.beans.factory.annotation.Value;
1715
import org.springframework.stereotype.Component;
1816

19-
import java.io.File;
2017
import java.util.Map;
2118

2219
@Component
2320
@JobScope
2421
public class BulkSubjectMigrationTasklet implements Tasklet {
2522
private static final Logger logger = LoggerFactory.getLogger(BulkSubjectMigrationTasklet.class);
2623
private final SubjectMigrationService subjectMigrationService;
27-
private final S3Service s3Service;
24+
2825
@Value("#{jobParameters['uuid']}")
2926
String uuid;
3027

3128
@Value("#{jobParameters['mode']}")
3229
String mode;
3330

3431
@Value("#{jobParameters['bulkSubjectMigrationParameters']}")
35-
SubjectMigrationRequest bulkSubjectMigrationParameters;
32+
BulkSubjectMigrationRequest bulkSubjectMigrationParameters;
3633

3734
@Autowired
38-
public BulkSubjectMigrationTasklet(SubjectMigrationService subjectMigrationService, S3Service s3Service) {
35+
public BulkSubjectMigrationTasklet(SubjectMigrationService subjectMigrationService, BulkUploadS3Service s3Service) {
3936
this.subjectMigrationService = subjectMigrationService;
40-
this.s3Service = s3Service;
4137
}
4238

4339
@Override
4440
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
4541
Map<String, String> failedMigrations = subjectMigrationService.bulkMigrate(SubjectMigrationService.BulkSubjectMigrationModes.valueOf(mode), bulkSubjectMigrationParameters);
46-
ObjectMapper objectMapper = ObjectMapperSingleton.getObjectMapper();
47-
String fileName = uuid + ".json";
48-
File failedMigrationsFile = new File("/tmp/" + fileName);
49-
objectMapper.writerWithDefaultPrettyPrinter().writeValue(failedMigrationsFile, failedMigrations);
50-
// TODO upload file to S3
51-
// s3Service.uploadFile(failedMigrationsFile, fileName, "bulkuploads/subjectmigrations");
42+
chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().put("failedMigrations", failedMigrations);
5243
return RepeatStatus.FINISHED;
5344
}
5445
}

avni-server-api/src/main/java/org/avni/server/service/BulkUploadS3Service.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public ObjectInfo uploadErrorFile(File tempSourceFile, String uuid) throws IOExc
3737
return s3Service.uploadFile(tempSourceFile, format("%s.csv", uuid), "bulkuploads/error");
3838
}
3939

40+
public ObjectInfo uploadFile(File localFile, String filename, String directory) throws IOException {
41+
return s3Service.uploadFile(localFile, filename, directory);
42+
}
43+
4044
public File getLocalErrorFile(String uuid) {
4145
File errorDir = new File(format("%s/bulkuploads/error", System.getProperty("java.io.tmpdir")));
4246
errorDir.mkdirs();

avni-server-api/src/main/java/org/avni/server/service/SubjectMigrationService.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import org.avni.server.framework.security.UserContextHolder;
99
import org.avni.server.service.accessControl.AccessControlService;
1010
import org.avni.server.web.IndividualController;
11-
import org.avni.server.web.request.SubjectMigrationRequest;
11+
import org.avni.server.web.request.BulkSubjectMigrationRequest;
1212
import org.joda.time.DateTime;
1313
import org.slf4j.LoggerFactory;
1414
import org.springframework.beans.factory.annotation.Autowired;
@@ -155,11 +155,11 @@ public void changeSubjectSyncConceptValues(Individual subject, String destinatio
155155
individualService.save(subject);
156156
}
157157

158-
public Map<String, String> bulkMigrate(BulkSubjectMigrationModes mode, SubjectMigrationRequest subjectMigrationRequest) {
158+
public Map<String, String> bulkMigrate(BulkSubjectMigrationModes mode, BulkSubjectMigrationRequest bulkSubjectMigrationRequest) {
159159
if (mode == BulkSubjectMigrationModes.byAddress) {
160-
return bulkMigrateByAddress(subjectMigrationRequest.getSubjectIds(), subjectMigrationRequest.getDestinationAddresses());
160+
return bulkMigrateByAddress(bulkSubjectMigrationRequest.getSubjectIds(), bulkSubjectMigrationRequest.getDestinationAddresses());
161161
} else {
162-
return bulkMigrateBySyncConcept(subjectMigrationRequest.getSubjectIds(), subjectMigrationRequest.getDestinationSyncConcepts());
162+
return bulkMigrateBySyncConcept(bulkSubjectMigrationRequest.getSubjectIds(), bulkSubjectMigrationRequest.getDestinationSyncConcepts());
163163
}
164164
}
165165

@@ -219,18 +219,20 @@ public Map<String, String> bulkMigrateBySyncConcept(List<Long> subjectIds, Map<S
219219
}
220220

221221
private String validateSyncConcept(String subjectTypeSyncConceptUuid, String currentValue, Map<String, String> destinationSyncConcepts) {
222+
String destinationSyncConceptValue = destinationSyncConcepts.get(subjectTypeSyncConceptUuid);
223+
if (subjectTypeSyncConceptUuid != null && destinationSyncConceptValue == null) {
224+
return null; // No migration required for this sync concept.
225+
}
222226
if (subjectTypeSyncConceptUuid == null) {
223227
throw new RuntimeException("No sync concept configured for subject type");
224228
}
225229
Concept syncConcept = conceptRepository.findByUuid(subjectTypeSyncConceptUuid);
226230

227-
String destinationSyncConceptValue = destinationSyncConcepts.get(subjectTypeSyncConceptUuid);
228-
229231
if (Objects.equals(currentValue, destinationSyncConceptValue)) {
230232
throw new RuntimeException("Source value and Destination value are the same");
231233
}
232234

233-
if (destinationSyncConceptValue != null && syncConcept.isCoded()) {
235+
if (syncConcept.isCoded()) {
234236
ConceptAnswer conceptAnswer = syncConcept.findConceptAnswerByConceptUUID(destinationSyncConceptValue);
235237
if (conceptAnswer == null || conceptAnswer.isVoided()) {
236238
throw new RuntimeException(String.format("Invalid value '%s' for coded sync concept", destinationSyncConceptValue));

avni-server-api/src/main/java/org/avni/server/web/SubjectMigrationController.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.avni.server.service.UserService;
1212
import org.avni.server.service.accessControl.AccessControlService;
1313
import org.avni.server.util.BadRequestError;
14-
import org.avni.server.web.request.SubjectMigrationRequest;
14+
import org.avni.server.web.request.BulkSubjectMigrationRequest;
1515
import org.avni.server.web.response.slice.SlicedResources;
1616
import org.joda.time.DateTime;
1717
import org.slf4j.Logger;
@@ -38,6 +38,7 @@
3838
import java.util.Collections;
3939
import java.util.UUID;
4040

41+
import static java.lang.String.format;
4142
import static org.avni.server.web.resourceProcessors.ResourceProcessor.addAuditFields;
4243

4344
@RestController
@@ -116,29 +117,31 @@ public Resource<SubjectMigration> process(Resource<SubjectMigration> resource) {
116117
@RequestMapping(value = "/api/subjectMigration/bulk", method = RequestMethod.POST)
117118
@PreAuthorize(value = "hasAnyAuthority('user')")
118119
public ResponseEntity migrate(@RequestParam(value = "mode", defaultValue = "byAddress") SubjectMigrationService.BulkSubjectMigrationModes mode,
119-
@RequestBody SubjectMigrationRequest subjectMigrationRequest) {
120+
@RequestBody BulkSubjectMigrationRequest bulkSubjectMigrationRequest) {
120121
accessControlService.checkPrivilege(PrivilegeType.MultiTxEntityTypeUpdate);
121-
if (subjectMigrationRequest.getSubjectIds() == null) {
122+
if (bulkSubjectMigrationRequest.getSubjectIds() == null) {
122123
throw new BadRequestError("subjectIds is required");
123124
}
124-
if (mode == SubjectMigrationService.BulkSubjectMigrationModes.byAddress && subjectMigrationRequest.getDestinationAddresses() == null) {
125+
if (mode == SubjectMigrationService.BulkSubjectMigrationModes.byAddress && bulkSubjectMigrationRequest.getDestinationAddresses() == null) {
125126
throw new BadRequestError("destinationAddresses is required for mode: byAddress");
126127
}
127-
if (mode == SubjectMigrationService.BulkSubjectMigrationModes.bySyncConcept && subjectMigrationRequest.getDestinationSyncConcepts() == null) {
128+
if (mode == SubjectMigrationService.BulkSubjectMigrationModes.bySyncConcept && bulkSubjectMigrationRequest.getDestinationSyncConcepts() == null) {
128129
throw new BadRequestError("destinationSyncConcepts is required for mode: bySyncConcepts");
129130
}
130131

131132
UserContext userContext = UserContextHolder.getUserContext();
132133
User user = userContext.getUser();
133134
Organisation organisation = userContext.getOrganisation();
134135
String jobUUID = UUID.randomUUID().toString();
136+
String fileName = format("%s-%s-%s.%s", jobUUID, mode, user.getUsername(), "json");
135137
JobParameters jobParameters =
136138
new JobParametersBuilder()
137139
.addString("uuid", jobUUID)
138140
.addString("organisationUUID", organisation.getUuid())
139141
.addLong("userId", user.getId(), false)
140142
.addString("mode", String.valueOf(mode))
141-
.addParameter("bulkSubjectMigrationParameters", new CustomJobParameter<>(subjectMigrationRequest))
143+
.addString("fileName", fileName)
144+
.addParameter("bulkSubjectMigrationParameters", new CustomJobParameter<>(bulkSubjectMigrationRequest))
142145
.toJobParameters();
143146

144147
try {
@@ -148,7 +151,7 @@ public ResponseEntity migrate(@RequestParam(value = "mode", defaultValue = "byAd
148151
throw new RuntimeException(String.format("Error while starting the bulk subject migration job, %s", e.getMessage()), e);
149152
}
150153

151-
return ResponseEntity.status(HttpStatus.ACCEPTED).body(jobUUID);
154+
return ResponseEntity.status(HttpStatus.ACCEPTED).body(migrationStatus(jobUUID));
152155
}
153156

154157
@RequestMapping(value = "/api/subjectMigration/bulk/status/{jobUuid}", method = RequestMethod.GET)
@@ -157,6 +160,6 @@ public JobStatus migrationStatus(@PathVariable("jobUuid") String jobUuid) {
157160
accessControlService.checkPrivilege(PrivilegeType.MultiTxEntityTypeUpdate);
158161
String jobFilterCondition = " and uuid = '" + jobUuid + "'";
159162
Page<JobStatus> jobStatuses = avniJobRepository.getJobStatuses(UserContextHolder.getUser(), jobFilterCondition, PageRequest.of(0, 1));
160-
return jobStatuses != null ? jobStatuses.getContent().get(0) : null;
163+
return (jobStatuses != null && !jobStatuses.getContent().isEmpty()) ? jobStatuses.getContent().get(0) : null;
161164
}
162165
}

avni-server-api/src/main/java/org/avni/server/web/request/SubjectMigrationRequest.java renamed to avni-server-api/src/main/java/org/avni/server/web/request/BulkSubjectMigrationRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import java.util.List;
55
import java.util.Map;
66

7-
public class SubjectMigrationRequest implements Serializable {
7+
public class BulkSubjectMigrationRequest implements Serializable {
88
private List<Long> subjectIds;
99
private Map<String, String> destinationAddresses;
1010
private Map<String, String> destinationSyncConcepts;

0 commit comments

Comments
 (0)