Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<constructor-arg name="startTxnId" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-id.start}" />
<constructor-arg name="stopTxnId" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-id.stop}" />
<constructor-arg name="backgroundWorkerTransactionsQueueSize" value="${eu.xenit.alfresco.healthprocessor.indexing.single-txns.background-worker-queue-size}" />
<constructor-arg name="transactionMinSizeThreshold" value="${eu.xenit.alfresco.healthprocessor.processing.node-batch-size}" />
</bean>

<bean id="eu.xenit.alfresco.healthprocessor.indexing.lasttxns.LastTxnsIndexingConfiguration"
Expand All @@ -50,6 +51,7 @@
<bean id="eu.xenit.alfresco.healthprocessor.indexing.TrackingComponent"
class="eu.xenit.alfresco.healthprocessor.indexing.Alfresco7TrackingComponent">
<constructor-arg name="trackingComponent" ref="searchTrackingComponent" />
<constructor-arg name="nodeDAO" ref="nodeDAO"/>
</bean>

<bean id="eu.xenit.alfresco.healthprocessor.util.AlfrescoAttributeStore"
Expand All @@ -62,5 +64,7 @@
<constructor-arg name="configuration" ref="eu.xenit.alfresco.healthprocessor.indexing.IndexingConfiguration" />
<constructor-arg name="trackingComponent" ref="eu.xenit.alfresco.healthprocessor.indexing.TrackingComponent" />
<constructor-arg name="attributeStore" ref="eu.xenit.alfresco.healthprocessor.util.AlfrescoAttributeStore" />
<constructor-arg name="nodeDaoAwareTrackingComponent"
ref="eu.xenit.alfresco.healthprocessor.indexing.TrackingComponent" />
</bean>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
<constructor-arg name="mergerThreads" value="${eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.threads}" />
<constructor-arg name="transactionHelper" ref="eu.xenit.alfresco.healthprocessor.util.AlfrescoTransactionHelper" />
<constructor-arg name="nodeService" ref="NodeService" />
<constructor-arg name="nodeDAO" ref="nodeDAO"/>
</bean>

<bean id="eu.xenit.alfresco.healthprocessor.plugins.solr.SolrRequestExecutor"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package eu.xenit.alfresco.healthprocessor;

import eu.xenit.alfresco.healthprocessor.indexing.TrackingComponent;
import org.alfresco.repo.domain.node.Transaction;

import java.util.List;

public interface NodeDaoAwareTrackingComponent extends TrackingComponent {
int getTransactionCount();
List<Transaction> getNextTransactions(Integer count);
int changesCount(long txnId);
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,60 @@
package eu.xenit.alfresco.healthprocessor.indexing;

import eu.xenit.alfresco.healthprocessor.NodeDaoAwareTrackingComponent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.alfresco.repo.domain.node.AbstractNodeDAOImpl;
import org.alfresco.repo.domain.node.Transaction;
import org.alfresco.repo.search.SearchTrackingComponent;
import org.alfresco.repo.solr.NodeParameters;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.StoreRef;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import lombok.AllArgsConstructor;
import org.alfresco.repo.search.SearchTrackingComponent;
import org.alfresco.repo.solr.NodeParameters;
import java.util.stream.Collectors;

@AllArgsConstructor
public class Alfresco7TrackingComponent implements TrackingComponent {
@Slf4j
@RequiredArgsConstructor
public class Alfresco7TrackingComponent implements NodeDaoAwareTrackingComponent {

private final SearchTrackingComponent trackingComponent;

private final AbstractNodeDAOImpl nodeDAO;

private Long lastTxnTime = -1L;

@Override
public long getMaxTxnId() {
return trackingComponent.getMaxTxnId();
}

@Override
public int getTransactionCount() {
int count = nodeDAO.getTransactionCount();
log.debug("Found {} transactions", count);
return count;
}

@Override
public synchronized List<Transaction> getNextTransactions(Integer count) {
List<Transaction> txns = nodeDAO.selectTxns(lastTxnTime, Long.MAX_VALUE, count,
null, null, true);
long newLastTxnTime = txns.stream().mapToLong(Transaction::getCommitTimeMs).max().orElse(Long.MAX_VALUE);
log.debug("Returning {} transactions from {} to {}", txns, lastTxnTime, newLastTxnTime);
lastTxnTime = newLastTxnTime;
return txns;
}

public int changesCount(long txnId) {
List<NodeRef.Status> changes = this.nodeDAO.getTxnChangesForStore(
StoreRef.STORE_REF_WORKSPACE_SPACESSTORE, txnId);
int count = changes.size();
log.debug("Txn {} has {} changes", txnId, count);
return count;
}

@Override
public Set<NodeInfo> getNodesForTxnIds(List<Long> txnIds) {
if(txnIds.isEmpty()) {
Expand All @@ -30,6 +68,10 @@ public Set<NodeInfo> getNodesForTxnIds(List<Long> txnIds) {
return true;
});

List<String> ids = txnIds.stream()
.map(l -> Long.toString(l))
.collect(Collectors.toList());
log.debug("Returning {} nodes for transactions {}", ret.size(), String.join(",", ids));
return ret;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package eu.xenit.alfresco.healthprocessor.indexing;

import eu.xenit.alfresco.healthprocessor.NodeDaoAwareTrackingComponent;
import eu.xenit.alfresco.healthprocessor.indexing.lasttxns.LastTxnsBasedIndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.lasttxns.LastTxnsIndexingConfiguration;
import eu.xenit.alfresco.healthprocessor.indexing.singletxns.SingleTransactionIndexingConfiguration;
Expand All @@ -17,6 +18,8 @@ public final class IndexingStrategyFactoryBean extends AbstractFactoryBean<Index
private final TrackingComponent trackingComponent;
private final AttributeStore attributeStore;

private final NodeDaoAwareTrackingComponent nodeDaoAwareTrackingComponent;

@Override
public Class<?> getObjectType() {
return IndexingStrategy.class;
Expand All @@ -30,11 +33,14 @@ protected IndexingStrategy createInstance() {
private IndexingStrategy createIndexingStrategy(IndexingStrategy.IndexingStrategyKey indexingStrategy) {
switch(indexingStrategy) {
case TXNID:
return new TxnIdBasedIndexingStrategy((TxnIdIndexingConfiguration) configuration, trackingComponent, attributeStore);
return new TxnIdBasedIndexingStrategy(
(TxnIdIndexingConfiguration) configuration, trackingComponent, attributeStore);
case LAST_TXNS:
return new LastTxnsBasedIndexingStrategy((LastTxnsIndexingConfiguration) configuration, trackingComponent);
return new LastTxnsBasedIndexingStrategy(
(LastTxnsIndexingConfiguration) configuration, trackingComponent);
case SINGLE_TXNS:
return new SingleTransactionIndexingStrategy(trackingComponent, (SingleTransactionIndexingConfiguration) configuration);
return new SingleTransactionIndexingStrategy(
nodeDaoAwareTrackingComponent, (SingleTransactionIndexingConfiguration) configuration);
default:
throw new IllegalArgumentException("Unknown indexing strategy: "+ indexingStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package eu.xenit.alfresco.healthprocessor.indexing.singletxns;

import com.google.common.base.Strings;
import eu.xenit.alfresco.healthprocessor.NodeDaoAwareTrackingComponent;
import eu.xenit.alfresco.healthprocessor.indexing.TrackingComponent;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.alfresco.repo.domain.node.Transaction;
import org.alfresco.service.cmr.repository.NodeRef;
import org.apache.commons.lang3.tuple.Pair;

Expand All @@ -16,15 +19,18 @@
public class SingleTransactionIndexingBackgroundWorker implements Runnable {

private final @NonNull BlockingDeque<@NonNull Pair<@NonNull Long, @NonNull Set<@NonNull NodeRef>>> buffer;
private final @NonNull TrackingComponent trackingComponent;
private final @NonNull NodeDaoAwareTrackingComponent trackingComponent;
private final @NonNull SingleTransactionIndexingState state;
private final int aggregateThreshhold;

public SingleTransactionIndexingBackgroundWorker(@NonNull TrackingComponent trackingComponent,
public SingleTransactionIndexingBackgroundWorker(@NonNull NodeDaoAwareTrackingComponent trackingComponent,
@NonNull SingleTransactionIndexingConfiguration configuration,
@NonNull SingleTransactionIndexingState state) {
this.buffer = new LinkedBlockingDeque<>(configuration.getBackgroundWorkerTransactionsQueueSize());
this.trackingComponent = trackingComponent;
this.state = state;
String threshold = configuration.getConfiguration().get("transaction-min-size-threshold");
this.aggregateThreshhold = Strings.isNullOrEmpty(threshold) ? 0 : Integer.parseInt(threshold);

this.state.setCurrentlyProcessedTxnId(-1);
}
Expand All @@ -40,11 +46,20 @@ public void run() {
try {
log.debug("The background worker of the SingleTransactionIndexingStrategy has been started.");

// Thread-safe: the indexer strategy can not increase its current Txn ID if the background worker
// hasn't returned anything yet. Just make sure the current & last Txn ID are set properly before executing this code.
long start = state.getCurrentTxnId();
long end = state.getLastTxnId();
for (long i = start; i < end; i++) handleNextTransaction(i);
// // Thread-safe: the indexer strategy can not increase its current Txn ID if the background worker
// // hasn't returned anything yet. Just make sure the current & last Txn ID are set properly before executing this code.
int maxTxnCount = trackingComponent.getTransactionCount();
for(int txnCount = 1; txnCount <= maxTxnCount; txnCount++) {
log.debug("Getting transaction {}", txnCount);
List<Transaction> transactions = trackingComponent.getNextTransactions(1);
if (transactions == null || transactions.isEmpty()) return; // early exit, nothing more to process

Transaction currentlyProcessedTransaction = transactions.get(0);
long txnId = currentlyProcessedTransaction.getId();
log.trace("Currently processing transaction with ID ({} @ {}).",
txnId, currentlyProcessedTransaction.getCommitTimeMs());
handleNextTransaction(txnId);
}

log.debug("The background worker of the SingleTransactionIndexingStrategy has stopped.");
} catch (InterruptedException e) {
Expand All @@ -59,8 +74,11 @@ public void run() {
}

private void handleNextTransaction(long txnId) throws InterruptedException {
log.trace("Currently processing transaction with ID ({}).", txnId);
Set<TrackingComponent.NodeInfo> fetchedNodes = trackingComponent.getNodesForTxnIds(List.of(txnId));
state.setCurrentTxnId(txnId);
if(fetchedNodes.size() <= aggregateThreshhold) {
return;
}
Set<NodeRef> nodeRefs = fetchedNodes.stream()
.map(TrackingComponent.NodeInfo::getNodeRef)
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import lombok.NonNull;

import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;

import static eu.xenit.alfresco.healthprocessor.indexing.IndexingStrategy.IndexingStrategyKey.SINGLE_TXNS;
Expand All @@ -21,22 +22,33 @@ public class SingleTransactionIndexingConfiguration implements IndexingConfigura
private final long startTxnId;
private final long stopTxnId;
private final int backgroundWorkerTransactionsQueueSize;
private final int transactionMinSizeThreshold;
private final @NonNull Map<String, String> configuration;
private final @NonNull IndexingStrategy.IndexingStrategyKey indexingStrategy = SINGLE_TXNS;

public SingleTransactionIndexingConfiguration(long startTxnId, long stopTxnId,
int backgroundWorkerTransactionsQueueSize) {
int backgroundWorkerTransactionsQueueSize,
int transactionMinSizeThreshold) {
if (startTxnId > stopTxnId) throw new IllegalArgumentException(String.format("invalid configuration, startTxnId (%d) > stopId (%d)", startTxnId, stopTxnId));
if (backgroundWorkerTransactionsQueueSize <= 0) throw new IllegalArgumentException("invalid configuration, backgroundWorkerTransactionsQueueSize <= 0");

this.startTxnId = startTxnId;
this.stopTxnId = stopTxnId;
this.backgroundWorkerTransactionsQueueSize = backgroundWorkerTransactionsQueueSize;
this.transactionMinSizeThreshold = transactionMinSizeThreshold;

// We don't need to dynamically generate this. We can just create it once.
this.configuration = Map.of(START_TXN_ID_IDENTIFIER, Long.toString(startTxnId),
STOP_TXN_ID_IDENTIFIER, Long.toString(stopTxnId),
BACKGROUND_WORKER_TRANSACTIONS_QUEUE_SIZE_IDENTIFIER, Long.toString(backgroundWorkerTransactionsQueueSize));
}

@Nonnull
@Override
public Map<String, String> getConfiguration() {
Map<String, String> ret = new HashMap<>();
ret.put("transaction-min-size-threshold", Long.toString(transactionMinSizeThreshold));
return ret;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package eu.xenit.alfresco.healthprocessor.indexing.singletxns;

import eu.xenit.alfresco.healthprocessor.NodeDaoAwareTrackingComponent;
import eu.xenit.alfresco.healthprocessor.indexing.IndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.NullCycleProgress;
import eu.xenit.alfresco.healthprocessor.indexing.SimpleCycleProgress;
Expand All @@ -10,14 +11,12 @@
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.alfresco.service.cmr.repository.NodeRef;
import org.apache.commons.collections4.set.UnmodifiableSet;
import org.apache.commons.lang3.tuple.Pair;

import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

@Slf4j
public class SingleTransactionIndexingStrategy implements IndexingStrategy {
Expand All @@ -34,9 +33,9 @@ public class SingleTransactionIndexingStrategy implements IndexingStrategy {
private final @NonNull SingleTransactionIndexingState state = new SingleTransactionIndexingState();
private final @NonNull SingleTransactionIndexingBackgroundWorker backgroundWorker;
private @Nullable Thread backgroundWorkerThread;
private final @NonNull LongSupplier progressSupplier = state::getCurrentTxnId;
private final @NonNull LongSupplier progressSupplier = state::getCurrentlyProcessedTxnId;

public SingleTransactionIndexingStrategy(@NonNull TrackingComponent trackingComponent,
public SingleTransactionIndexingStrategy(@NonNull NodeDaoAwareTrackingComponent trackingComponent,
@NonNull SingleTransactionIndexingConfiguration configuration) {
log.warn("A SingleTransactionIndexingStrategy has been created as part of the health processor setup. " +
"Please note that this strategy ignores the amount of requested nodeRefs and always returns exactly " +
Expand All @@ -46,7 +45,7 @@ public SingleTransactionIndexingStrategy(@NonNull TrackingComponent trackingComp
this.configuration = configuration;
this.trackingComponent = trackingComponent;

this.state.setCurrentTxnId(-1);
this.state.setCurrentTxnId(-1L);
this.state.setLastTxnId(configuration.getStopTxnId());
}

Expand All @@ -72,7 +71,6 @@ public void onStart() {
Pair<Long, Set<NodeRef>> txnIdAndNodeRefs;
do {
txnIdAndNodeRefs = backgroundWorker.takeNextTransaction();
state.setCurrentTxnId(txnIdAndNodeRefs.getLeft());
} while (txnIdAndNodeRefs.getRight().isEmpty() && txnIdAndNodeRefs.getLeft() != -1);

return txnIdAndNodeRefs.getRight();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@
import eu.xenit.alfresco.healthprocessor.reporter.api.NodeHealthReport;
import eu.xenit.alfresco.healthprocessor.util.TransactionHelper;
import lombok.NonNull;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.alfresco.repo.domain.node.AbstractNodeDAOImpl;
import org.alfresco.repo.domain.node.NodeDAO;
import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.cmr.repository.StoreRef;
import org.alfresco.service.namespace.QName;
import org.alfresco.util.Pair;

import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand All @@ -37,10 +40,11 @@ public class SolrUndersizedTransactionsHealthProcessorPlugin extends ToggleableH
private boolean receivingNewTransactionsFromIndexer = false;
final @NonNull AtomicInteger queuedMergeRequests = new AtomicInteger(0);
private final @NonNull ExecutorService mergerExecutor;
private final @NonNull NodeDAO nodeDAO;

public SolrUndersizedTransactionsHealthProcessorPlugin(@NonNull Properties properties, boolean enabled,
int threshold, int mergerThreads, @NonNull TransactionHelper transactionHelper,
@NonNull NodeService nodeService) {
@NonNull NodeService nodeService, @NonNull AbstractNodeDAOImpl nodeDAO) {
super(enabled);
guaranteeSingleTransactionIndexerIsUsed(properties);

Expand All @@ -49,6 +53,7 @@ public SolrUndersizedTransactionsHealthProcessorPlugin(@NonNull Properties prope
this.transactionHelper = transactionHelper;
this.nodeService = nodeService;
this.mergerExecutor = Executors.newFixedThreadPool(mergerThreads);
this.nodeDAO = nodeDAO;

SingleTransactionIndexingStrategy.listenToIndexerStart(this::onIndexerStart);
SingleTransactionIndexingStrategy.listenToIndexerStop(this::onIndexerStop);
Expand Down Expand Up @@ -99,16 +104,16 @@ protected Set<NodeHealthReport> doProcess(Set<NodeRef> allNodeRefs) {

private void mergeTransactions(@NonNull Set<@NonNull NodeRef> backgroundWorkerBatch) {
try {
AuthenticationUtil.runAsSystem(() -> {
List<Long> nodeIds = backgroundWorkerBatch.parallelStream()
.map(this.nodeDAO::getNodePair)
.map(Pair::getFirst).collect(Collectors.toList());
// AuthenticationUtil.runAsSystem(() -> {
transactionHelper.inNewTransaction(() -> {
for (NodeRef nodeRef : backgroundWorkerBatch) {
nodeService.addAspect(nodeRef, ASPECT_QNAME, Map.of());
nodeService.removeAspect(nodeRef, ASPECT_QNAME);
}
long txnId = this.nodeDAO.getCurrentTransactionId(false);
this.nodeDAO.touchNodes(txnId, nodeIds);
}, false);

return null;
});
// return null;
// });
} catch (Exception e) {
log.error("An error occurred while merging the transactions.", e);
} finally {
Expand Down
Loading
Loading