From acfad989fa0e20114c8e2fd1b2471daf2233fc1b Mon Sep 17 00:00:00 2001 From: Zlatin Todorinski Date: Thu, 29 Oct 2020 10:29:11 +0100 Subject: [PATCH 1/4] Added service to handle the health validation execution cycles --- .../context/service-context.xml | 4 + .../platform-healthProcessor-context.xml | 1 + .../xenit/alfresco/processor/model/Cycle.java | 19 +++ .../processor/service/CycleService.java | 134 ++++++++++++++++++ .../processor/service/ProcessorService.java | 8 +- .../service/ProcessorServiceTest.java | 12 +- 6 files changed, 172 insertions(+), 6 deletions(-) create mode 100644 alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/model/Cycle.java create mode 100644 alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml index dc353bbe..724d3e4a 100644 --- a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml @@ -12,4 +12,8 @@ + + + \ No newline at end of file diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/subsystems/healthProcessor/platform/platform-healthProcessor-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/subsystems/healthProcessor/platform/platform-healthProcessor-context.xml index dd443333..bb5d86b5 100644 --- a/alfresco-health-processor-platform/src/main/amp/config/alfresco/subsystems/healthProcessor/platform/platform-healthProcessor-context.xml +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/subsystems/healthProcessor/platform/platform-healthProcessor-context.xml @@ -8,5 +8,6 @@ + diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/model/Cycle.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/model/Cycle.java new file mode 100644 index 00000000..7a2a39eb --- /dev/null +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/model/Cycle.java @@ -0,0 +1,19 @@ +package eu.xenit.alfresco.processor.model; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * Information object containing the parameters for the tracking activity. + */ +@Data +@AllArgsConstructor +public class Cycle { + private int txnLimit; + private long firstTxn; + private int timeIncrementSeconds; + private long firstCommitTime; + + private long transactionId; + private long commitTimeMs; +} diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java new file mode 100644 index 00000000..a3edbd5b --- /dev/null +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java @@ -0,0 +1,134 @@ +package eu.xenit.alfresco.processor.service; + +import eu.xenit.alfresco.processor.model.Cycle; +import lombok.AllArgsConstructor; +import org.alfresco.repo.solr.Transaction; +import org.alfresco.repo.transaction.RetryingTransactionHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +@AllArgsConstructor +public class CycleService { + private static final Logger logger = LoggerFactory.getLogger(CycleService.class); + + protected final RetryingTransactionHelper retryingTransactionHelper; + + public void execute(HealthProcessorConfiguration configurationService) { + Cycle cycle = createCycle( configurationService); + AtomicBoolean continueCycle = new AtomicBoolean(); + run(configurationService, cycle, continueCycle); + while(configurationService.isEnabled() + && !configurationService.isRunOnce() + && continueCycle.get()) { + run(configurationService, cycle, continueCycle); + } + } + + Cycle createCycle(HealthProcessorConfiguration configurationService) { + return new Cycle( + configurationService.getTransactionLimit(), + configurationService.getFirstTransaction(), + configurationService.getTimeIncrementSeconds(), + configurationService.getFirstCommitTime(), + configurationService.getFirstTransaction(), + configurationService.getFirstCommitTime() + ); + } + + void run(HealthProcessorConfiguration configurationService, Cycle cycle, AtomicBoolean continueCycle) { + start(cycle); + + AtomicBoolean reachedMaxTx = new AtomicBoolean(false); + retryingTransactionHelper.doInTransaction(() -> { + reachedMaxTx.set(reachedLastTx(cycle.getTransactionId())); + return null; + },false, true); + + if(reachedMaxTx.get()) { + try { + logger.error("Max transaction reached, entering idle state"); + Thread.sleep(configurationService.getTimeIncrementSeconds() * 1000); + } catch (InterruptedException e) { + logger.error("Idling has failed, aborting...", e); + continueCycle.set(false); + } + } + } + + void start(Cycle cycle) { + int txnLimit = cycle.getTxnLimit(); + int timeIncrementSeconds = cycle.getTimeIncrementSeconds(); + long firstCommitTime = cycle.getFirstCommitTime(); + + logger.debug("Tracking changes ... Start commit time: {}", LocalDateTime + .ofInstant(Instant.ofEpochMilli(firstCommitTime), ZoneId.systemDefault()) + .toString()); + + processTxnRange(cycle, txnLimit, timeIncrementSeconds); + + long timeIncrementEpoch = timeIncrementSeconds * 1000L; + while(txnHistoryIsCatchingUp(timeIncrementEpoch, cycle.getCommitTimeMs())) { + cycle.setCommitTimeMs(cycle.getCommitTimeMs() + timeIncrementEpoch); + processTxnRange(cycle, txnLimit, timeIncrementSeconds); + } + } + + void processTxnRange(Cycle cycle, int txnLimit, int timeIncrementSeconds) { + // Save current progress in case of transactions collection failure + long maxTxId = cycle.getTransactionId(); + long maxCommitTimeMs = cycle.getCommitTimeMs(); + + try { + List txs = getNodeTransactions( + txnLimit, timeIncrementSeconds); + + logger.debug("Found {} transactions", txs.size()); + + if(txs.size() > 0) { + // Yay, we have a list of transactions to process! Save them for later + maxCommitTimeMs = txs.stream().map(Transaction::getCommitTimeMs) + .max(Long::compare).get(); + maxTxId = txs.stream().map(Transaction::getId) + .max(Long::compare).get(); + + for(Transaction tx : txs) { + try { + retryingTransactionHelper.doInTransaction(() -> { + // do work + return null; + }, true, false); + } catch (Exception e) { + logger.error("Tracker loop failed: " + e.getMessage(), e); + } + } + } + + cycle.setTransactionId(maxTxId); + cycle.setCommitTimeMs( Long.max(maxCommitTimeMs, cycle.getCommitTimeMs())); + + } catch (Exception ex) { + logger.error("Impossible to read tracker info: " + ex.getMessage(), ex); + } + } + + private boolean txnHistoryIsCatchingUp(long timeIncrementEpoch, long commitTimeMs) { + long supposedLastScanTime = OffsetDateTime.now().toInstant().toEpochMilli() - timeIncrementEpoch; + return supposedLastScanTime > commitTimeMs; + } + + private boolean reachedLastTx(long transactionId) { + return transactionId < 100_000; + } + + private List getNodeTransactions(int txnLimit, int timeIncrementSeconds) { + return new ArrayList<>(); + } +} diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProcessorService.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProcessorService.java index 4b3242ed..0e863148 100644 --- a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProcessorService.java +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProcessorService.java @@ -15,6 +15,7 @@ public class ProcessorService { protected final ExecutorService executorService; protected final HealthProcessorConfiguration configuration; protected final ProcessorAttributeService processorAttributeService; + protected final CycleService cycleService; public void validateHealth() { if(!configuration.isEnabled()) { @@ -34,8 +35,11 @@ public void validateHealth() { () -> processorAttributeService .persistAttribute(ProcessorAttributeService.ATTR_KEY_IS_RUNNING, true), false, true); - // do work - processorAttributeService.cleanupAttributes(); + try { + this.cycleService.execute(configuration); + } finally { + processorAttributeService.cleanupAttributes(); + } }); } diff --git a/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/service/ProcessorServiceTest.java b/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/service/ProcessorServiceTest.java index 1fdf4fb5..1f25610f 100644 --- a/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/service/ProcessorServiceTest.java +++ b/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/service/ProcessorServiceTest.java @@ -30,7 +30,8 @@ public void validateHealthWhenDisabledTest() { ProcessorService processorService = createNewProcessorService( executorService, configurationService, - processorAttributeService); + processorAttributeService, + null); processorService.validateHealth(); verify(processorAttributeService, never()) .getAttribute(anyString(), anyBoolean()); @@ -58,7 +59,8 @@ public void validateHealthWhenEnabledTest() { ProcessorService processorService = createNewProcessorService( executorService, configurationService, - processorAttributeService); + processorAttributeService, + null); processorService.validateHealth(); assertTrue(executed.get()); } @@ -66,12 +68,14 @@ public void validateHealthWhenEnabledTest() { private ProcessorService createNewProcessorService( ExecutorService executorService, HealthProcessorConfiguration configuration, - ProcessorAttributeService processorAttributeService) { + ProcessorAttributeService processorAttributeService, + CycleService cycleService) { return new ProcessorService( null, executorService, configuration, - processorAttributeService + processorAttributeService, + cycleService ); } } From f54f7c4bd59b9e1a6329c919e86ccf2bd84dbbc3 Mon Sep 17 00:00:00 2001 From: Zlatin Todorinski Date: Thu, 29 Oct 2020 10:53:48 +0100 Subject: [PATCH 2/4] Converted some of the local state to a state object TrackerInfo; Introduced service to centralize the state management logic for TrackerInfo --- .../context/search-context.xml | 58 +++++++++++++++ .../context/service-context.xml | 5 ++ .../module-context.xml | 1 + .../xenit/alfresco/processor/model/Cycle.java | 4 +- .../alfresco/processor/model/TrackerInfo.java | 29 ++++++++ .../processor/service/CycleService.java | 69 ++++++++++-------- .../processor/service/ProgressTracker.java | 53 ++++++++++++++ .../service/ProgressTrackerTest.java | 72 +++++++++++++++++++ 8 files changed, 259 insertions(+), 32 deletions(-) create mode 100644 alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/search-context.xml create mode 100644 alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/model/TrackerInfo.java create mode 100644 alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProgressTracker.java create mode 100644 alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/service/ProgressTrackerTest.java diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/search-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/search-context.xml new file mode 100644 index 00000000..3716f838 --- /dev/null +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/search-context.xml @@ -0,0 +1,58 @@ + + + + + + + + + + + + ${search.solrTrackingSupport.ignorePathsForSpecificTypes:false} + + + + {http://www.alfresco.org/model/content/1.0}person + {http://www.alfresco.org/model/application/1.0}configurations + {http://www.alfresco.org/model/content/1.0}authorityContainer + + + + + + + ${search.solrTrackingSupport.ignorePathsForSpecificAspects:false} + + + + + + + + + + + + + + + + + + ${eu.xenit.alfresco.processor.tracking.enabled:true} + + + + + + \ No newline at end of file diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml index 724d3e4a..c51c4cfd 100644 --- a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml @@ -12,8 +12,13 @@ + + + + \ No newline at end of file diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/module-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/module-context.xml index c348be71..b4ce3ea7 100644 --- a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/module-context.xml +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/module-context.xml @@ -3,6 +3,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> + diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/model/Cycle.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/model/Cycle.java index 7a2a39eb..446d7fae 100644 --- a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/model/Cycle.java +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/model/Cycle.java @@ -13,7 +13,5 @@ public class Cycle { private long firstTxn; private int timeIncrementSeconds; private long firstCommitTime; - - private long transactionId; - private long commitTimeMs; + private TrackerInfo trackerInfo; } diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/model/TrackerInfo.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/model/TrackerInfo.java new file mode 100644 index 00000000..e1eb98f9 --- /dev/null +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/model/TrackerInfo.java @@ -0,0 +1,29 @@ +package eu.xenit.alfresco.processor.model; + +import lombok.Data; + +import java.time.OffsetDateTime; + +/** + * Information object to save the state of the tracking activity. The state is persisted to allow for the correct + * continuation after a restart of the system. + */ +@Data +public class TrackerInfo { + /** + * Timestamp of the last action + */ + private OffsetDateTime timestamp; + /** + * Id of the last transaction processed by the tracker + */ + private Long transactionId; + /** + * Highest commit time in ms of processed transaction + */ + private Long commitTimeMs = -1L; + /** + * Name of the artifact running this tracker software + */ + private String name; +} diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java index a3edbd5b..5b0d9e39 100644 --- a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java @@ -1,6 +1,7 @@ package eu.xenit.alfresco.processor.service; import eu.xenit.alfresco.processor.model.Cycle; +import eu.xenit.alfresco.processor.model.TrackerInfo; import lombok.AllArgsConstructor; import org.alfresco.repo.solr.Transaction; import org.alfresco.repo.transaction.RetryingTransactionHelper; @@ -20,6 +21,7 @@ public class CycleService { private static final Logger logger = LoggerFactory.getLogger(CycleService.class); protected final RetryingTransactionHelper retryingTransactionHelper; + protected final ProgressTracker progressTracker; public void execute(HealthProcessorConfiguration configurationService) { Cycle cycle = createCycle( configurationService); @@ -33,22 +35,25 @@ public void execute(HealthProcessorConfiguration configurationService) { } Cycle createCycle(HealthProcessorConfiguration configurationService) { - return new Cycle( - configurationService.getTransactionLimit(), + TrackerInfo ti = progressTracker.getTrackerInfo( + configurationService.getFirstTransaction(), + configurationService.getFirstCommitTime() + ); + return new Cycle(configurationService.getTransactionLimit(), configurationService.getFirstTransaction(), configurationService.getTimeIncrementSeconds(), configurationService.getFirstCommitTime(), - configurationService.getFirstTransaction(), - configurationService.getFirstCommitTime() + ti ); } void run(HealthProcessorConfiguration configurationService, Cycle cycle, AtomicBoolean continueCycle) { - start(cycle); + cycle = start(cycle); + final TrackerInfo ti = cycle.getTrackerInfo(); AtomicBoolean reachedMaxTx = new AtomicBoolean(false); retryingTransactionHelper.doInTransaction(() -> { - reachedMaxTx.set(reachedLastTx(cycle.getTransactionId())); + reachedMaxTx.set(progressTracker.reachedLastTx(ti)); return null; },false, true); @@ -57,13 +62,14 @@ void run(HealthProcessorConfiguration configurationService, Cycle cycle, AtomicB logger.error("Max transaction reached, entering idle state"); Thread.sleep(configurationService.getTimeIncrementSeconds() * 1000); } catch (InterruptedException e) { - logger.error("Idling has failed, aborting...", e); + logger.error("Idling has failed, aborting..."); continueCycle.set(false); } } } - void start(Cycle cycle) { + Cycle start(Cycle cycle) { + TrackerInfo trackerInfo = cycle.getTrackerInfo(); int txnLimit = cycle.getTxnLimit(); int timeIncrementSeconds = cycle.getTimeIncrementSeconds(); long firstCommitTime = cycle.getFirstCommitTime(); @@ -72,22 +78,35 @@ void start(Cycle cycle) { .ofInstant(Instant.ofEpochMilli(firstCommitTime), ZoneId.systemDefault()) .toString()); - processTxnRange(cycle, txnLimit, timeIncrementSeconds); + trackerInfo = processTxnRange( + trackerInfo, txnLimit, timeIncrementSeconds); long timeIncrementEpoch = timeIncrementSeconds * 1000L; - while(txnHistoryIsCatchingUp(timeIncrementEpoch, cycle.getCommitTimeMs())) { - cycle.setCommitTimeMs(cycle.getCommitTimeMs() + timeIncrementEpoch); - processTxnRange(cycle, txnLimit, timeIncrementSeconds); + while(txnHistoryIsCatchingUp(timeIncrementEpoch, trackerInfo)) { + progressTracker.updateTrackerInfo( + trackerInfo, + trackerInfo.getTransactionId(), + trackerInfo.getCommitTimeMs() + timeIncrementEpoch); + trackerInfo = processTxnRange( + trackerInfo, txnLimit, timeIncrementSeconds); } + + cycle.setTrackerInfo(trackerInfo); + return cycle; + } + + private boolean txnHistoryIsCatchingUp(long timeIncrementEpoch, TrackerInfo trackerInfo) { + long supposedLastScanTime = OffsetDateTime.now().toInstant().toEpochMilli() - timeIncrementEpoch; + return supposedLastScanTime > trackerInfo.getCommitTimeMs(); } - void processTxnRange(Cycle cycle, int txnLimit, int timeIncrementSeconds) { + TrackerInfo processTxnRange(TrackerInfo trackerInfo, int txnLimit, int timeIncrementSeconds) { // Save current progress in case of transactions collection failure - long maxTxId = cycle.getTransactionId(); - long maxCommitTimeMs = cycle.getCommitTimeMs(); + long maxTxId = trackerInfo.getTransactionId(); + long maxCommitTimeMs = trackerInfo.getCommitTimeMs(); try { - List txs = getNodeTransactions( + List txs = getNodeTransactions(trackerInfo, txnLimit, timeIncrementSeconds); logger.debug("Found {} transactions", txs.size()); @@ -111,24 +130,16 @@ void processTxnRange(Cycle cycle, int txnLimit, int timeIncrementSeconds) { } } - cycle.setTransactionId(maxTxId); - cycle.setCommitTimeMs( Long.max(maxCommitTimeMs, cycle.getCommitTimeMs())); + progressTracker.updateTrackerInfo(trackerInfo, maxTxId, + Long.max(maxCommitTimeMs, trackerInfo.getCommitTimeMs())); } catch (Exception ex) { logger.error("Impossible to read tracker info: " + ex.getMessage(), ex); } + return trackerInfo; } - private boolean txnHistoryIsCatchingUp(long timeIncrementEpoch, long commitTimeMs) { - long supposedLastScanTime = OffsetDateTime.now().toInstant().toEpochMilli() - timeIncrementEpoch; - return supposedLastScanTime > commitTimeMs; - } - - private boolean reachedLastTx(long transactionId) { - return transactionId < 100_000; - } - - private List getNodeTransactions(int txnLimit, int timeIncrementSeconds) { + private List getNodeTransactions(TrackerInfo trackerInfo, int txnLimit, int timeIncrementSeconds) { return new ArrayList<>(); } -} +} \ No newline at end of file diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProgressTracker.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProgressTracker.java new file mode 100644 index 00000000..89a3c9f2 --- /dev/null +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProgressTracker.java @@ -0,0 +1,53 @@ +package eu.xenit.alfresco.processor.service; + +import eu.xenit.alfresco.processor.model.TrackerInfo; +import lombok.AllArgsConstructor; +import org.alfresco.repo.solr.SOLRTrackingComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.util.Objects; + +@AllArgsConstructor +public class ProgressTracker { + private static final Logger logger = LoggerFactory.getLogger(ProgressTracker.class); + + protected final SOLRTrackingComponent tracker; + + public void updateTrackerInfo(TrackerInfo trackerInfo, + final long maxTransactionId, final long maxCommitTime) { + trackerInfo.setTransactionId(maxTransactionId); + trackerInfo.setCommitTimeMs(maxCommitTime); + trackerInfo.setTimestamp(OffsetDateTime.now()); + trackerInfo.setName(this.getClass().getName()); + } + + public boolean reachedLastTx(TrackerInfo ti) { + boolean result = Objects.equals(ti.getTransactionId(), tracker.getMaxTxnId()); + logger.info("Reached Last: {} ; provided Tx: {} ; max tx: {}", + result, ti.getTransactionId(), tracker.getMaxTxnId()); + return result; + } + + public TrackerInfo getTrackerInfo(final long firstTxn, final long firstCommitTime) { + // If no progress is recorded, scan from the beginning + TrackerInfo trackerInfo = new TrackerInfo(); + + if(firstTxn > 0 && firstCommitTime > 0) { + updateTrackerInfo(trackerInfo, firstTxn - 1, firstCommitTime); + } else { + updateTrackerInfo(trackerInfo, + 1, + LocalDate.now() + .minusYears(20) + .atStartOfDay(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli() + ); + } + return trackerInfo; + } +} diff --git a/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/service/ProgressTrackerTest.java b/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/service/ProgressTrackerTest.java new file mode 100644 index 00000000..c5eed900 --- /dev/null +++ b/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/service/ProgressTrackerTest.java @@ -0,0 +1,72 @@ +package eu.xenit.alfresco.processor.service; + +import eu.xenit.alfresco.processor.model.TrackerInfo; +import org.alfresco.repo.solr.SOLRTrackingComponent; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class ProgressTrackerTest { + @Mock + private SOLRTrackingComponent tracker; + + @Test + public void updateTrackerInfoTest() { + final long value = 123L; + final TrackerInfo trackerInfo = new TrackerInfo(); + new ProgressTracker(tracker) + .updateTrackerInfo(trackerInfo,value,value); + validateTrackerInfo(trackerInfo, value, value); + } + + @Test + public void createNewTrackerInfoTest() { + final long value = 234L; + final ProgressTracker progressTracker = new ProgressTracker(tracker); + final TrackerInfo trackerInfo = progressTracker + .getTrackerInfo(value, value); + validateTrackerInfo(trackerInfo, value - 1, value); + + // Idempotency test + final TrackerInfo newTrackerInfo = progressTracker + .getTrackerInfo(value, value); + validateTrackerInfo(newTrackerInfo, value - 1, value); + } + + @Test + public void reachedLastTxTest() { + final long value = 123L; + when(tracker.getMaxTxnId()).thenAnswer(invocation -> value); + final ProgressTracker progressTracker = new ProgressTracker(tracker); + final TrackerInfo trackerInfo = new TrackerInfo(); + trackerInfo.setTransactionId(value); + assertTrue(progressTracker.reachedLastTx(trackerInfo)); + } + + @Test + public void notReachedLastTxTest() { + when(tracker.getMaxTxnId()).thenAnswer(invocation -> 123L); + final ProgressTracker progressTracker = new ProgressTracker(tracker); + final TrackerInfo trackerInfo = new TrackerInfo(); + trackerInfo.setTransactionId(456L); + assertFalse(progressTracker.reachedLastTx(trackerInfo)); + } + + private void validateTrackerInfo(final TrackerInfo ti, final long txId, final long commitTimeMs) { + assertNotNull(ti.getName()); + assertNotNull(ti.getClass()); + assertNotNull(ti.getTimestamp()); + assertNotNull(ti.getTransactionId()); + assertNotNull(ti.getCommitTimeMs()); + assertEquals(txId, ti.getTransactionId()); + assertEquals(commitTimeMs, ti.getCommitTimeMs()); + } +} From 70d4775e6ca44a86c525b0176b07076847f96518 Mon Sep 17 00:00:00 2001 From: Zlatin Todorinski Date: Thu, 29 Oct 2020 11:44:06 +0100 Subject: [PATCH 3/4] Added NodeTxService implementation that uses SOLRTrackingComponent to fetch the node references based on transaction --- .../context/service-context.xml | 5 ++ .../processor/service/CycleService.java | 22 +++--- .../processor/service/NodeTxService.java | 72 +++++++++++++++++++ 3 files changed, 87 insertions(+), 12 deletions(-) create mode 100644 alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/NodeTxService.java diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml index c51c4cfd..d0cd28f4 100644 --- a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml @@ -16,9 +16,14 @@ class="eu.xenit.alfresco.processor.service.ProgressTracker"> + + + + \ No newline at end of file diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java index 5b0d9e39..06987704 100644 --- a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java @@ -5,6 +5,7 @@ import lombok.AllArgsConstructor; import org.alfresco.repo.solr.Transaction; import org.alfresco.repo.transaction.RetryingTransactionHelper; +import org.alfresco.service.cmr.repository.NodeRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,7 +13,6 @@ import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneId; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -22,6 +22,7 @@ public class CycleService { protected final RetryingTransactionHelper retryingTransactionHelper; protected final ProgressTracker progressTracker; + protected final NodeTxService nodeTxService; public void execute(HealthProcessorConfiguration configurationService) { Cycle cycle = createCycle( configurationService); @@ -48,7 +49,7 @@ Cycle createCycle(HealthProcessorConfiguration configurationService) { } void run(HealthProcessorConfiguration configurationService, Cycle cycle, AtomicBoolean continueCycle) { - cycle = start(cycle); + start(cycle); final TrackerInfo ti = cycle.getTrackerInfo(); AtomicBoolean reachedMaxTx = new AtomicBoolean(false); @@ -68,7 +69,7 @@ void run(HealthProcessorConfiguration configurationService, Cycle cycle, AtomicB } } - Cycle start(Cycle cycle) { + void start(Cycle cycle) { TrackerInfo trackerInfo = cycle.getTrackerInfo(); int txnLimit = cycle.getTxnLimit(); int timeIncrementSeconds = cycle.getTimeIncrementSeconds(); @@ -92,7 +93,6 @@ Cycle start(Cycle cycle) { } cycle.setTrackerInfo(trackerInfo); - return cycle; } private boolean txnHistoryIsCatchingUp(long timeIncrementEpoch, TrackerInfo trackerInfo) { @@ -106,8 +106,8 @@ TrackerInfo processTxnRange(TrackerInfo trackerInfo, int txnLimit, int timeIncre long maxCommitTimeMs = trackerInfo.getCommitTimeMs(); try { - List txs = getNodeTransactions(trackerInfo, - txnLimit, timeIncrementSeconds); + List txs = nodeTxService.getNodeTransactions( + trackerInfo, txnLimit, timeIncrementSeconds); logger.debug("Found {} transactions", txs.size()); @@ -121,7 +121,9 @@ TrackerInfo processTxnRange(TrackerInfo trackerInfo, int txnLimit, int timeIncre for(Transaction tx : txs) { try { retryingTransactionHelper.doInTransaction(() -> { - // do work + // Collect node references within transaction range + List nodes = nodeTxService.getNodeReferences(tx); + // TODO validate return null; }, true, false); } catch (Exception e) { @@ -138,8 +140,4 @@ TrackerInfo processTxnRange(TrackerInfo trackerInfo, int txnLimit, int timeIncre } return trackerInfo; } - - private List getNodeTransactions(TrackerInfo trackerInfo, int txnLimit, int timeIncrementSeconds) { - return new ArrayList<>(); - } -} \ No newline at end of file +} diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/NodeTxService.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/NodeTxService.java new file mode 100644 index 00000000..60b72afa --- /dev/null +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/NodeTxService.java @@ -0,0 +1,72 @@ +package eu.xenit.alfresco.processor.service; + +import eu.xenit.alfresco.processor.model.TrackerInfo; +import lombok.AllArgsConstructor; +import org.alfresco.repo.solr.NodeParameters; +import org.alfresco.repo.solr.SOLRTrackingComponent; +import org.alfresco.repo.solr.Transaction; +import org.alfresco.service.cmr.repository.NodeRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +@AllArgsConstructor +public class NodeTxService { + private static final Logger logger = LoggerFactory.getLogger(NodeTxService.class); + + protected final SOLRTrackingComponent tracker; + + public List getNodeReferences(Transaction tx) { + return getNodeReferences(Collections.singletonList(tx)); + } + + public List getNodeReferences(List txns) { + try { + List txnIds = txns.stream().map(Transaction::getId).collect(Collectors.toList()); + String txnIdsString = txnIds.stream().map(Object::toString) + .collect(Collectors.joining(",")); + logger.trace("Fetching txs: {}", txnIdsString); + NodeParameters params = new NodeParameters(); + params.setTransactionIds(txnIds); + final List nodeRefs = new ArrayList<>(); + SOLRTrackingComponent.NodeQueryCallback callback = node -> { + nodeRefs.add(node.getNodeRef()); + return true; + }; + tracker.getNodes(params, callback); + String nodeRefsString = nodeRefs.stream() + .map(Object::toString) + .collect(Collectors.joining(",")); + logger.trace("Received nodes: {}", nodeRefsString); + return nodeRefs; + } catch (Exception ex) { + logger.error("Impossible to read tracker info: " + ex.getMessage(), ex); + return new ArrayList<>(); + } + } + + public List getNodeTransactions(TrackerInfo trackerInfo, int txnLimit, int timeIncrementSeconds) { + long maxCommitTimeEpoch = trackerInfo.getCommitTimeMs() + (timeIncrementSeconds * 1000L); + logger.trace("Fetching a maximum of {} txs from {} to {}, commit time from {} to {}", + txnLimit, trackerInfo.getTransactionId(), Long.MAX_VALUE, + toString(trackerInfo.getCommitTimeMs()), toString(maxCommitTimeEpoch)); + return tracker.getTransactions( + trackerInfo.getTransactionId(), trackerInfo.getCommitTimeMs(), + Long.MAX_VALUE, maxCommitTimeEpoch, + txnLimit + ); + } + + private String toString(long epoch) { + return LocalDateTime + .ofInstant(Instant.ofEpochMilli(epoch), ZoneId.systemDefault()) + .toString(); + } +} From 66c3d2e4b432fe18c8ecb66b8ee3faf11c8fbd1e Mon Sep 17 00:00:00 2001 From: Zlatin Todorinski Date: Thu, 29 Oct 2020 12:29:54 +0100 Subject: [PATCH 4/4] Added validation service and content health check module --- .../alfresco-global.properties | 7 +- .../context/job-scheduling-context.xml | 11 +- .../context/service-context.xml | 1 + .../context/subsystem-context.xml | 2 +- .../context/validation-context.xml | 24 ++++ .../module-context.xml | 1 + .../platform-healthProcessor-context.xml | 9 +- .../modules/ContentExistenceValidator.java | 65 ++++++++++ .../processor/modules/NodeValidator.java | 7 ++ .../processor/service/CycleService.java | 21 +++- .../processor/service/IProcessorService.java | 5 + .../processor/service/ProcessorService.java | 6 +- .../processor/service/ProgressTracker.java | 7 +- .../processor/service/ValidationService.java | 36 ++++++ .../processor/tasks/ProcessorTask.java | 4 +- .../ContentExistenceValidatorTest.java | 114 ++++++++++++++++++ .../service/ValidationServiceTest.java | 108 +++++++++++++++++ integration-tests/build.gradle | 5 +- .../test/resources/compose/docker-compose.yml | 26 +++- 19 files changed, 433 insertions(+), 26 deletions(-) create mode 100644 alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/validation-context.xml create mode 100644 alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/modules/ContentExistenceValidator.java create mode 100644 alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/modules/NodeValidator.java create mode 100644 alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/IProcessorService.java create mode 100644 alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ValidationService.java create mode 100644 alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/modules/ContentExistenceValidatorTest.java create mode 100644 alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/service/ValidationServiceTest.java diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/alfresco-global.properties b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/alfresco-global.properties index eead4610..e571d5e8 100644 --- a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/alfresco-global.properties +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/alfresco-global.properties @@ -1,10 +1,11 @@ eu.xenit.alfresco.processor.task.cron=* * * * * ? 2099 eu.xenit.alfresco.processor.task.delay=0 eu.xenit.alfresco.processor.enabled=true +eu.xenit.alfresco.processor.tracking.threadpool-size=5 eu.xenit.alfresco.processor.tracking.enabled=true -eu.xenit.alfresco.processor.run-once=true -eu.xenit.alfresco.processor.scope=ALL +eu.xenit.alfresco.processor.run-once=false eu.xenit.alfresco.processor.transaction.start=1 eu.xenit.alfresco.processor.transaction.limit=1000 eu.xenit.alfresco.processor.transaction.time.increment=15 -eu.xenit.alfresco.processor.transaction.time.start=2020-10-08 +eu.xenit.alfresco.processor.transaction.time.start=2020-11-26 +eu.xenit.alfresco.processor.validation.content.type-filter={http://www.alfresco.org/model/content/1.0}content diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/job-scheduling-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/job-scheduling-context.xml index e2d110e1..b88e43bb 100644 --- a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/job-scheduling-context.xml +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/job-scheduling-context.xml @@ -9,17 +9,17 @@ - - - + ${eu.xenit.alfresco.processor.task.cron} @@ -28,12 +28,11 @@ - + - + diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml index d0cd28f4..93da89d5 100644 --- a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/service-context.xml @@ -25,5 +25,6 @@ + \ No newline at end of file diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/subsystem-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/subsystem-context.xml index 17b3e0e5..daad11ce 100644 --- a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/subsystem-context.xml +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/subsystem-context.xml @@ -9,7 +9,7 @@ - eu.xenit.alfresco.processor.service.ProcessorService + eu.xenit.alfresco.processor.service.IProcessorService diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/validation-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/validation-context.xml new file mode 100644 index 00000000..ff26ce14 --- /dev/null +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/context/validation-context.xml @@ -0,0 +1,24 @@ + + + + + ${eu.xenit.alfresco.processor.validation.content.type-filter} + + + + + + + + + + + + + \ No newline at end of file diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/module-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/module-context.xml index b4ce3ea7..d8437e93 100644 --- a/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/module-context.xml +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/module/alfresco-health-processor/module-context.xml @@ -3,6 +3,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> + diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/subsystems/healthProcessor/platform/platform-healthProcessor-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/subsystems/healthProcessor/platform/platform-healthProcessor-context.xml index bb5d86b5..0abc748b 100644 --- a/alfresco-health-processor-platform/src/main/amp/config/alfresco/subsystems/healthProcessor/platform/platform-healthProcessor-context.xml +++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/subsystems/healthProcessor/platform/platform-healthProcessor-context.xml @@ -6,8 +6,15 @@ - + + + + + ${eu.xenit.alfresco.processor.tracking.threadpool-size} + + + diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/modules/ContentExistenceValidator.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/modules/ContentExistenceValidator.java new file mode 100644 index 00000000..e69b8361 --- /dev/null +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/modules/ContentExistenceValidator.java @@ -0,0 +1,65 @@ +package eu.xenit.alfresco.processor.modules; + +import lombok.RequiredArgsConstructor; +import org.alfresco.model.ContentModel; +import org.alfresco.service.cmr.dictionary.DictionaryService; +import org.alfresco.service.cmr.repository.ContentReader; +import org.alfresco.service.cmr.repository.ContentService; +import org.alfresco.service.cmr.repository.NodeRef; +import org.alfresco.service.cmr.repository.NodeService; +import org.alfresco.service.namespace.QName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.security.InvalidParameterException; + +@RequiredArgsConstructor +public class ContentExistenceValidator implements NodeValidator { + private static final Logger logger = LoggerFactory.getLogger(ContentExistenceValidator.class); + + private QName filterTypeQName = null; + + protected final String filterType; + + protected final ContentService contentService; + + protected final NodeService nodeService; + + protected final DictionaryService dictionaryService; + + ContentExistenceValidator initialize() { + filterTypeQName = QName.createQName(filterType); + return this; + } + + @Override + public boolean validate(NodeRef nodeRef) { + logger.trace(">>> Entering ContentExistenceValidator.validate()"); + if(nodeRef == null) { + throw new InvalidParameterException("NodeRef cannot be null!"); + } + + if(!this.nodeService.exists(nodeRef)) { + logger.debug("Node does not exist: {}", nodeRef); + return false; + } + + QName type = this.nodeService.getType(nodeRef); + if(!this.dictionaryService.isSubClass(type, filterTypeQName)) { + logger.debug("Node {} is of type {}, skipping because it's subclass of {}", nodeRef, type, filterTypeQName); + return true; + } + + ContentReader reader = this.contentService + .getReader(nodeRef, ContentModel.PROP_CONTENT); + + if(reader != null && reader.exists()) { + logger.debug("Node Content Does Exist {}", nodeRef); + logger.trace("<<< Exiting ContentExistenceValidator.validate()"); + return true; + } + + logger.error("Node Content Does Not Exist {}", nodeRef); + return false; + } +} diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/modules/NodeValidator.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/modules/NodeValidator.java new file mode 100644 index 00000000..6714f613 --- /dev/null +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/modules/NodeValidator.java @@ -0,0 +1,7 @@ +package eu.xenit.alfresco.processor.modules; + +import org.alfresco.service.cmr.repository.NodeRef; + +public interface NodeValidator { + boolean validate(NodeRef nodeRef); +} diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java index 06987704..a6a6579e 100644 --- a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/CycleService.java @@ -17,22 +17,25 @@ import java.util.concurrent.atomic.AtomicBoolean; @AllArgsConstructor -public class CycleService { +public class CycleService { private static final Logger logger = LoggerFactory.getLogger(CycleService.class); protected final RetryingTransactionHelper retryingTransactionHelper; protected final ProgressTracker progressTracker; protected final NodeTxService nodeTxService; + protected final ValidationService validationService; public void execute(HealthProcessorConfiguration configurationService) { Cycle cycle = createCycle( configurationService); - AtomicBoolean continueCycle = new AtomicBoolean(); + AtomicBoolean continueCycle = new AtomicBoolean(true); run(configurationService, cycle, continueCycle); while(configurationService.isEnabled() && !configurationService.isRunOnce() && continueCycle.get()) { + logger.trace("Restarting the cycle..."); run(configurationService, cycle, continueCycle); } + logger.trace("Cycle completed."); } Cycle createCycle(HealthProcessorConfiguration configurationService) { @@ -60,7 +63,7 @@ void run(HealthProcessorConfiguration configurationService, Cycle cycle, AtomicB if(reachedMaxTx.get()) { try { - logger.error("Max transaction reached, entering idle state"); + logger.debug("Max transaction reached, entering idle state"); Thread.sleep(configurationService.getTimeIncrementSeconds() * 1000); } catch (InterruptedException e) { logger.error("Idling has failed, aborting..."); @@ -75,7 +78,7 @@ void start(Cycle cycle) { int timeIncrementSeconds = cycle.getTimeIncrementSeconds(); long firstCommitTime = cycle.getFirstCommitTime(); - logger.debug("Tracking changes ... Start commit time: {}", LocalDateTime + logger.trace("Tracking changes ... Start commit time: {}", LocalDateTime .ofInstant(Instant.ofEpochMilli(firstCommitTime), ZoneId.systemDefault()) .toString()); @@ -97,6 +100,8 @@ void start(Cycle cycle) { private boolean txnHistoryIsCatchingUp(long timeIncrementEpoch, TrackerInfo trackerInfo) { long supposedLastScanTime = OffsetDateTime.now().toInstant().toEpochMilli() - timeIncrementEpoch; + logger.trace("supposedLastScanTime {} > trackerInfo.getCommitTimeMs() {}", + supposedLastScanTime , trackerInfo.getCommitTimeMs()); return supposedLastScanTime > trackerInfo.getCommitTimeMs(); } @@ -105,6 +110,9 @@ TrackerInfo processTxnRange(TrackerInfo trackerInfo, int txnLimit, int timeIncre long maxTxId = trackerInfo.getTransactionId(); long maxCommitTimeMs = trackerInfo.getCommitTimeMs(); + logger.debug("maxTxId {}", maxTxId); + logger.debug("maxCommitTimeMs {}", maxCommitTimeMs); + try { List txs = nodeTxService.getNodeTransactions( trackerInfo, txnLimit, timeIncrementSeconds); @@ -116,14 +124,15 @@ TrackerInfo processTxnRange(TrackerInfo trackerInfo, int txnLimit, int timeIncre maxCommitTimeMs = txs.stream().map(Transaction::getCommitTimeMs) .max(Long::compare).get(); maxTxId = txs.stream().map(Transaction::getId) - .max(Long::compare).get(); + .max(Long::compare) + .orElse(-1L); for(Transaction tx : txs) { try { retryingTransactionHelper.doInTransaction(() -> { // Collect node references within transaction range List nodes = nodeTxService.getNodeReferences(tx); - // TODO validate + validationService.validate(nodes); return null; }, true, false); } catch (Exception e) { diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/IProcessorService.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/IProcessorService.java new file mode 100644 index 00000000..74376703 --- /dev/null +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/IProcessorService.java @@ -0,0 +1,5 @@ +package eu.xenit.alfresco.processor.service; + +public interface IProcessorService { + void validateHealth(); +} diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProcessorService.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProcessorService.java index 0e863148..a659a297 100644 --- a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProcessorService.java +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProcessorService.java @@ -8,7 +8,7 @@ import java.util.concurrent.ExecutorService; @AllArgsConstructor -public class ProcessorService { +public class ProcessorService implements IProcessorService { private static final Logger logger = LoggerFactory.getLogger(ProcessorService.class); protected final RetryingTransactionHelper retryingTransactionHelper; @@ -17,6 +17,7 @@ public class ProcessorService { protected final ProcessorAttributeService processorAttributeService; protected final CycleService cycleService; + @Override public void validateHealth() { if(!configuration.isEnabled()) { logger.info("Health validation initiated, but it is not enabled, aborting."); @@ -29,8 +30,9 @@ public void validateHealth() { return; } + logger.debug("Scheduling health validation!"); executorService.submit(() -> { - logger.trace("Current thread id: {}", Thread.currentThread().getId()); + logger.debug("Current thread id: {}", Thread.currentThread().getId()); doInTransaction( () -> processorAttributeService .persistAttribute(ProcessorAttributeService.ATTR_KEY_IS_RUNNING, true), diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProgressTracker.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProgressTracker.java index 89a3c9f2..7ffcf9bb 100644 --- a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProgressTracker.java +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ProgressTracker.java @@ -19,9 +19,14 @@ public class ProgressTracker { public void updateTrackerInfo(TrackerInfo trackerInfo, final long maxTransactionId, final long maxCommitTime) { + OffsetDateTime now = OffsetDateTime.now(); + logger.debug("Setting maxTransactionIdto {}", maxTransactionId); + logger.debug("Setting maxCommitTime to {}", maxCommitTime); + logger.debug("Setting timestamp to {}", now); + logger.debug("Setting name to {}", this.getClass().getName()); trackerInfo.setTransactionId(maxTransactionId); trackerInfo.setCommitTimeMs(maxCommitTime); - trackerInfo.setTimestamp(OffsetDateTime.now()); + trackerInfo.setTimestamp(now); trackerInfo.setName(this.getClass().getName()); } diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ValidationService.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ValidationService.java new file mode 100644 index 00000000..f0ef3383 --- /dev/null +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/service/ValidationService.java @@ -0,0 +1,36 @@ +package eu.xenit.alfresco.processor.service; + +import eu.xenit.alfresco.processor.modules.NodeValidator; +import lombok.AllArgsConstructor; +import org.alfresco.service.cmr.repository.NodeRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.security.InvalidParameterException; +import java.util.List; + +@AllArgsConstructor +public class ValidationService { + private static final Logger logger = LoggerFactory.getLogger(ValidationService.class); + + protected final List nodeValidators; + + public void validate(List nodeRefs) { + if(nodeRefs == null || nodeRefs.isEmpty()) { + throw new InvalidParameterException("NodeRefs list cannot be null or empty!"); + } + if(nodeValidators == null || nodeValidators.isEmpty()) { + logger.warn("No validators provided, aborting!"); + return; + } + for(NodeRef nodeRef : nodeRefs) { + for (NodeValidator nodeValidator : nodeValidators) { + if(!nodeValidator.validate(nodeRef)) { + // TODO Throw an exception ? + logger.warn("NodeRef {} is in invalid state!", nodeRef); + break; + } + } + } + } +} diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/tasks/ProcessorTask.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/tasks/ProcessorTask.java index 0c2159b0..32ffe332 100644 --- a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/tasks/ProcessorTask.java +++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/processor/tasks/ProcessorTask.java @@ -1,6 +1,6 @@ package eu.xenit.alfresco.processor.tasks; -import eu.xenit.alfresco.processor.service.ProcessorService; +import eu.xenit.alfresco.processor.service.IProcessorService; import lombok.AllArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -9,7 +9,7 @@ public class ProcessorTask { private static final Logger logger = LoggerFactory.getLogger(ProcessorTask.class); - private final ProcessorService processorService; + private final IProcessorService processorService; public void execute() { logger.trace("Invoke health-processor service here"); diff --git a/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/modules/ContentExistenceValidatorTest.java b/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/modules/ContentExistenceValidatorTest.java new file mode 100644 index 00000000..b45fa4b2 --- /dev/null +++ b/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/modules/ContentExistenceValidatorTest.java @@ -0,0 +1,114 @@ +package eu.xenit.alfresco.processor.modules; + +import org.alfresco.service.cmr.dictionary.DictionaryService; +import org.alfresco.service.cmr.repository.ContentReader; +import org.alfresco.service.cmr.repository.ContentService; +import org.alfresco.service.cmr.repository.NodeRef; +import org.alfresco.service.cmr.repository.NodeService; +import org.alfresco.service.namespace.QName; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.security.InvalidParameterException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class ContentExistenceValidatorTest { + private final NodeRef nodeRef = new NodeRef("workspace://SpacesStore/a88561b3-c631-44cb-a883-180c6107a60b"); + private final String filterType = "{http://www.alfresco.org/model/content/1.0}content"; + private final QName folderType = QName.createQName("{http://www.alfresco.org/model/content/1.0}folder"); + private final QName filterTypeQName = QName.createQName(filterType); + + @Mock + private ContentReader reader; + + @Mock + private ContentService contentService; + + @Mock + private NodeService nodeService; + + @Mock + private DictionaryService dictionaryService; + + @Test + public void nodeDoesNotExistTest() { + when(nodeService.exists(any(NodeRef.class))) + .thenReturn(false); + boolean exists = new ContentExistenceValidator(filterType, contentService, + nodeService, dictionaryService) + .initialize() + .validate(nodeRef); + assertFalse(exists); + verify(dictionaryService, never()) + .isSubClass(any(QName.class), any(QName.class)); + verify(contentService, never()) + .getReader( + any(NodeRef.class), + any(QName.class) + ); + verify(reader, never()).exists(); + } + + @Test void nodeFilteredTest() { + when(nodeService.exists(any(NodeRef.class))) + .thenReturn(true); + when(nodeService.getType(any(NodeRef.class))) + .thenReturn(folderType); + when(dictionaryService + .isSubClass(any(QName.class), any(QName.class))) + .thenReturn(false); + boolean exists = new ContentExistenceValidator(filterType, contentService, + nodeService, dictionaryService) + .initialize() + .validate(nodeRef); + assertTrue(exists); + verify(contentService, never()) + .getReader( + any(NodeRef.class), + any(QName.class) + ); + verify(reader, never()).exists(); + } + + @Test + public void validateContentExistenceTest() { + when(nodeService.exists(any(NodeRef.class))) + .thenReturn(true); + when(nodeService.getType(any(NodeRef.class))) + .thenReturn(filterTypeQName); + when(dictionaryService + .isSubClass(any(QName.class), any(QName.class))) + .thenReturn(true); + when(reader.exists()).thenReturn(true); + when( + contentService.getReader( + any(NodeRef.class), + any(QName.class) + ) + ).thenReturn(reader); + boolean exists = new ContentExistenceValidator(filterType, contentService, + nodeService, dictionaryService) + .initialize() + .validate(nodeRef); + assertTrue(exists); + } + + @Test() + public void nullNodeRefThrowsTest() { + Assertions.assertThrows(InvalidParameterException.class, () -> { + new ContentExistenceValidator(filterType, contentService, + nodeService, dictionaryService) + .validate(null); + }); + } +} diff --git a/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/service/ValidationServiceTest.java b/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/service/ValidationServiceTest.java new file mode 100644 index 00000000..4dfb1661 --- /dev/null +++ b/alfresco-health-processor-platform/src/test/java/eu/xenit/alfresco/processor/service/ValidationServiceTest.java @@ -0,0 +1,108 @@ +package eu.xenit.alfresco.processor.service; + +import eu.xenit.alfresco.processor.modules.NodeValidator; +import org.alfresco.service.cmr.repository.NodeRef; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class ValidationServiceTest { + private final NodeRef nodeRefA = + new NodeRef("workspace://SpacesStore/a88561b3-c631-44cb-a883-180c6107a60b"); + private final NodeRef nodeRefB = + new NodeRef("workspace://SpacesStore/b88561b3-c631-44cb-a883-180c6107a60b"); + + @Mock + private NodeValidator validator; + + @Mock + private NodeValidator secondaryValidator; + + @Test + public void invokeValidationTest(){ + final List nodeRefs = Arrays.asList(nodeRefA, nodeRefB); + final AtomicBoolean executed = new AtomicBoolean(false); + new ValidationService(createEternalOptimisticValidatorsList(executed)) + .validate(nodeRefs); + assertTrue(executed.get()); + } + + @Test + public void shortCircuitValidationTest(){ + final List nodeRefs = Arrays.asList(nodeRefA, nodeRefB); + final AtomicBoolean validatorExecuted = new AtomicBoolean(false); + when(validator.validate(nodeRefA)) + .thenAnswer(invocation -> { + validatorExecuted.set(true); + return false; + }); + final List validators = Arrays.asList(validator, secondaryValidator); + new ValidationService(validators) + .validate(nodeRefs); + + // Assert that nodeRefA failed to validate and sit short-circuited the remaining validators + // validator should hit, but not secondaryValidator + // No validation required for secondaryValidator, since it's a mock and we didn't define any expected action + // If anything is called on secondaryValidator, it will throw an exception and fail this test! + assertTrue(validatorExecuted.get()); + } + + @Test + public void invokeValidationWithNoNodeRefsTest(){ + assertThrows(InvalidParameterException.class, () -> { + new ValidationService(new ArrayList<>()) + .validate(new ArrayList<>()); + }); + } + + @Test + public void invokeValidationWithNullNodeRefsTest(){ + assertThrows(InvalidParameterException.class, () -> { + new ValidationService(new ArrayList<>()) + .validate(null); + }); + } + + @Test + public void invokeValidationWithNoValidatorsTest(){ + final List nodeRefs = Arrays.asList(nodeRefA, nodeRefB); + new ValidationService(new ArrayList<>()) + .validate(nodeRefs); + // Gracefully aborted + assertTrue(true); + } + + @Test + public void invokeValidationWithNullValidatorsTest(){ + final List nodeRefs = Arrays.asList(nodeRefA, nodeRefB); + new ValidationService(null) + .validate(nodeRefs); + // Gracefully aborted + assertTrue(true); + } + + private List createEternalOptimisticValidatorsList(final AtomicBoolean executed) { + when(validator.validate(any(NodeRef.class))) + .thenAnswer(invocation -> { + if(executed != null) { + executed.set(true); + } + return true; + }); + return Collections.singletonList(validator); + } +} diff --git a/integration-tests/build.gradle b/integration-tests/build.gradle index f6bbbc8d..dd89e83b 100644 --- a/integration-tests/build.gradle +++ b/integration-tests/build.gradle @@ -85,9 +85,10 @@ subprojects { Project p -> removeVolumes = true captureContainersOutput = false + waitForTcpPorts = false - // Uncomment for quick iterations when developing integration tests - // stopContainers = false +// Uncomment for quick iterations when developing integration tests + stopContainers = false // expose alfresco/inflow on a random port, comment to use default port (8080) environment.put "ALFRESCO_TCP_8080", "8080" diff --git a/integration-tests/src/test/resources/compose/docker-compose.yml b/integration-tests/src/test/resources/compose/docker-compose.yml index 93a3631e..66ea643e 100644 --- a/integration-tests/src/test/resources/compose/docker-compose.yml +++ b/integration-tests/src/test/resources/compose/docker-compose.yml @@ -6,13 +6,35 @@ services: - target: 8080 published: 80 mode: host + - target: 8000 + published: 8000 + mode: host environment: - - CATALINA_OPTS=-Xdebug -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 - - INDEX=noindex + - INDEX=solr6 + - SOLR_HOST=solr + - SHARE_HOST=share - ENABLE_CLUSTERING=true + - CATALINA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 depends_on: - postgresql + share: + image: hub.xenit.eu/alfresco-enterprise/alfresco-share-enterprise:5.2 + ports: + - target: 8080 + published: 8082 + mode: host + environment: + - ALFRESCO_HOST=alfresco + depends_on: + - alfresco + + solr: + image: hub.xenit.eu/alfresco-enterprise/alfresco-solr6:1.2.0 + restart: unless-stopped + environment: + - ALFRESCO_HOST=alfresco-core + postgresql: image: docker.io/xenit/postgres environment: