diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java index cea15a1bfb68..179563cc3b3e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java @@ -70,15 +70,30 @@ protected void periodicExecute(final Env env) { // Failed procedures aren't persisted in WAL. batchIds[batchCount++] = entry.getKey(); if (batchCount == batchIds.length) { - store.delete(batchIds, 0, batchCount); - batchCount = 0; + try { + store.delete(batchIds, 0, batchCount); + } catch (Exception e) { + LOG.error("Error deleting completed procedures {}.", proc, e); + // Do not remove from the completed map. Even this procedure may be restored + // unexpectedly in another new CN leader, we do not need to do anything else since + // procedures are idempotent. + continue; + } finally { + batchCount = 0; + } } it.remove(); LOG.trace("Evict completed {}", proc); } } if (batchCount > 0) { - store.delete(batchIds, 0, batchCount); + try { + store.delete(batchIds, 0, batchCount); + } catch (Exception e) { + // Even this procedure may be restored unexpectedly in another new CN leader, we do not need + // to do anything else since procedures are idempotent. + LOG.error("Error deleting completed procedures {}.", batchIds, e); + } } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java index 91af03d3971b..89e6e37e431a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java @@ -24,6 +24,7 @@ import org.apache.iotdb.confignode.procedure.state.ProcedureLockState; import org.apache.iotdb.confignode.procedure.state.ProcedureState; import org.apache.iotdb.confignode.procedure.store.IProcedureStore; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -196,7 +197,7 @@ public void deserialize(ByteBuffer byteBuffer) { byteBuffer.get(resultArr); } // has lock - if (byteBuffer.get() == 1) { + if (byteBuffer.get() == 1 && this.state != ProcedureState.ROLLEDBACK) { this.lockedWhenLoading(); } } @@ -300,8 +301,15 @@ public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) { } ProcedureLockState state = acquireLock(env); if (state == ProcedureLockState.LOCK_ACQUIRED) { - locked = true; - store.update(this); + try { + locked = true; + store.update(this); + } catch (Exception e) { + // Do not need to do anything else. New leader which restore this procedure from a wrong + // state will reexecute it and converge to the correct state since procedures are + // idempotent. + LOG.warn("pid={} Failed to persist lock state to store.", this.procId, e); + } } return state; } @@ -312,12 +320,19 @@ public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) { * @param env environment * @param store ProcedureStore */ - public final void doReleaseLock(Env env, IProcedureStore store) { + public final void doReleaseLock(Env env, IProcedureStore store) throws Exception { locked = false; - if (getState() != ProcedureState.ROLLEDBACK) { + if (getState() == ProcedureState.ROLLEDBACK) { + LOG.info("Force write unlock state to raft for pid={}", this.procId); + } + try { store.update(this); + // do not release lock when consensus layer is not working + releaseLock(env); + } catch (ConsensusException e) { + LOG.error("pid={} Failed to persist unlock state to store.", this.procId, e); + throw e; } - releaseLock(env); } public final void restoreLock(Env env) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 0d8368583b4e..efd37778efa4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.procedure; import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; @@ -221,7 +222,8 @@ Long getRootProcedureId(Procedure proc) { private void releaseLock(Procedure procedure, boolean force) { if (force || !procedure.holdLock(this.environment) || procedure.isFinished()) { - procedure.doReleaseLock(this.environment, store); + RetryUtils.executeWithEndlessBackoffRetry( + () -> procedure.doReleaseLock(this.environment, store), "procedure release lock"); } } @@ -477,7 +479,11 @@ private void countDownChildren(RootProcedureStack rootProcStack, Procedure } if (parent != null && parent.tryRunnable()) { // If success, means all its children have completed, move parent to front of the queue. - store.update(parent); + // Must endless retry here, since this step is not idempotent and can not be re-execute + // correctly in new CN leader. + RetryUtils.executeWithEndlessBackoffRetry( + () -> store.update(parent), "count down children procedure"); + // do not add this procedure when exception occurred scheduler.addFront(parent); LOG.info( "Finished subprocedure pid={}, resume processing ppid={}", @@ -506,21 +512,44 @@ private void updateStoreOnExecution( if (LOG.isDebugEnabled()) { LOG.debug("Stored {}, children {}", proc, Arrays.toString(subprocs)); } - store.update(subprocs); + try { + store.update(subprocs); + } catch (Exception e) { + // Do nothing since this step is idempotent. New CN leader can converge to the correct + // state when restore this procedure. + LOG.warn("Failed to update subprocs on execution", e); + } } else { LOG.debug("Store update {}", proc); if (proc.isFinished() && !proc.hasParent()) { final long[] childProcIds = rootProcStack.getSubprocedureIds(); if (childProcIds != null) { - store.delete(childProcIds); - for (long childProcId : childProcIds) { - procedures.remove(childProcId); + try { + store.delete(childProcIds); + // do not remove these procedures when exception occurred + for (long childProcId : childProcIds) { + procedures.remove(childProcId); + } + } catch (Exception e) { + // Do nothing since this step is idempotent. New CN leader can converge to the correct + // state when restore this procedure. + LOG.warn("Failed to delete subprocedures on execution", e); } } else { - store.update(proc); + try { + store.update(proc); + } catch (Exception e) { + LOG.warn("Failed to update procedure on execution", e); + } } } else { - store.update(proc); + try { + store.update(proc); + } catch (Exception e) { + // Do nothing since this step is idempotent. New CN leader can converge to the correct + // state when restore this procedure. + LOG.warn("Failed to update procedure on execution", e); + } } } } @@ -577,7 +606,9 @@ private ProcedureLockState executeRootStackRollback( if (exception == null) { exception = procedureStack.getException(); rootProcedure.setFailure(exception); - store.update(rootProcedure); + // Endless retry since this step is not idempotent. + RetryUtils.executeWithEndlessBackoffRetry( + () -> store.update(rootProcedure), "root procedure rollback"); } List> subprocStack = procedureStack.getSubproceduresStack(); int stackTail = subprocStack.size(); @@ -653,18 +684,37 @@ private void cleanupAfterRollback(Procedure procedure) { procedure.updateMetricsOnFinish(getEnvironment(), procedure.elapsedTime(), false); if (procedure.hasParent()) { - store.delete(procedure.getProcId()); - procedures.remove(procedure.getProcId()); + try { + store.delete(procedure.getProcId()); + // do not remove this procedure when exception occurred + procedures.remove(procedure.getProcId()); + } catch (Exception e) { + // Do nothing since this step is idempotent. New CN leader can converge to the correct + // state when restore this procedure. + LOG.warn("Failed to delete procedure on rollback", e); + } } else { final long[] childProcIds = rollbackStack.get(procedure.getProcId()).getSubprocedureIds(); - if (childProcIds != null) { - store.delete(childProcIds); - } else { - store.update(procedure); + try { + if (childProcIds != null) { + store.delete(childProcIds); + } else { + store.update(procedure); + } + } catch (Exception e) { + // Do nothing since this step is idempotent. New CN leader can converge to the correct + // state when restore this procedure. + LOG.warn("Failed to delete procedure on rollback", e); } } } else { - store.update(procedure); + try { + store.update(procedure); + } catch (Exception e) { + // Do nothing since this step is idempotent. New CN leader can converge to the correct + // state when restore this procedure. + LOG.warn("Failed to update procedure on rollback", e); + } } } @@ -916,7 +966,11 @@ public long submitProcedure(Procedure procedure) { procedure.setProcId(store.getNextProcId()); procedure.setProcRunnable(); // Commit the transaction - store.update(procedure); + try { + store.update(procedure); + } catch (Exception e) { + LOG.error("Failed to update store procedure {}", procedure, e); + } LOG.debug("{} is stored.", procedure); // Add the procedure to the executor return pushProcedure(procedure); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java index 5aaf9a623f52..1614148abd30 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java @@ -19,12 +19,16 @@ package org.apache.iotdb.confignode.procedure; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class TimeoutExecutorThread extends StoppableThread { + private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutExecutorThread.class); private static final int DELAY_QUEUE_TIMEOUT = 20; private final ProcedureExecutor executor; private final DelayQueue> queue = new DelayQueue<>(); @@ -71,7 +75,13 @@ public void run() { long rootProcId = executor.getRootProcedureId(procedure); RootProcedureStack rollbackStack = executor.getRollbackStack(rootProcId); rollbackStack.abort(); - executor.getStore().update(procedure); + try { + executor.getStore().update(procedure); + } catch (Exception e) { + // Do nothing since new CN leader can converge to the correct state when restore this + // procedure. + LOGGER.warn("Failed to update procedure {}", procedure, e); + } executor.getScheduler().addFront(procedure); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java index 393c1e93740e..603a9a8a627d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java @@ -89,43 +89,55 @@ public long getNextProcId() { } @Override - public void update(Procedure procedure) { + public void update(Procedure procedure) throws Exception { Objects.requireNonNull(ProcedureFactory.getProcedureType(procedure), "Procedure type is null"); final UpdateProcedurePlan updateProcedurePlan = new UpdateProcedurePlan(procedure); try { configManager.getConsensusManager().write(updateProcedurePlan); } catch (ConsensusException e) { - LOG.warn("Failed in the write API executing the consensus layer due to: ", e); + LOG.warn( + "pid={} Failed in the write update API executing the consensus layer due to: ", + procedure.getProcId(), + e); + // In consensus layer API, do nothing but just throw an exception to let upper caller handle + // it. + throw e; } } @Override - public void update(Procedure[] subprocs) { + public void update(Procedure[] subprocs) throws Exception { for (Procedure subproc : subprocs) { update(subproc); } } @Override - public void delete(long procId) { + public void delete(long procId) throws Exception { DeleteProcedurePlan deleteProcedurePlan = new DeleteProcedurePlan(); deleteProcedurePlan.setProcId(procId); try { configManager.getConsensusManager().write(deleteProcedurePlan); } catch (ConsensusException e) { - LOG.warn("Failed in the write API executing the consensus layer due to: ", e); + LOG.warn( + "pid={} Failed in the write delete API executing the consensus layer due to: ", + procId, + e); + // In consensus layer API, do nothing but just throw an exception to let upper caller handle + // it. + throw e; } } @Override - public void delete(long[] childProcIds) { + public void delete(long[] childProcIds) throws Exception { for (long childProcId : childProcIds) { delete(childProcId); } } @Override - public void delete(long[] batchIds, int startIndex, int batchCount) { + public void delete(long[] batchIds, int startIndex, int batchCount) throws Exception { for (int i = startIndex; i < batchCount; i++) { delete(batchIds[i]); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java index 8e8e715fd84f..3dba6d29288d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java @@ -38,15 +38,15 @@ public interface IProcedureStore { long getNextProcId(); - void update(Procedure procedure); + void update(Procedure procedure) throws Exception; - void update(Procedure[] subprocs); + void update(Procedure[] subprocs) throws Exception; - void delete(long procId); + void delete(long procId) throws Exception; - void delete(long[] childProcIds); + void delete(long[] childProcIds) throws Exception; - void delete(long[] batchIds, int startIndex, int batchCount); + void delete(long[] batchIds, int startIndex, int batchCount) throws Exception; void cleanup(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java index 19a6456ec30a..6e586c7074dc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java @@ -19,8 +19,13 @@ package org.apache.iotdb.commons.utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class RetryUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(RetryUtils.class); + public interface CallableWithException { T call() throws E; } @@ -42,6 +47,58 @@ public static T retryOnException( } } + private static final long INITIAL_BACKOFF_MS = 100; + private static final long MAX_BACKOFF_MS = 60000; + + @FunctionalInterface + public interface OperationWithException { + void run() throws Exception; + } + + /** + * Exponential backoff retry helper method. + * + * @param operation The operation to execute. + * @param operationName A description of the operation (for logging). + */ + public static void executeWithEndlessBackoffRetry( + OperationWithException operation, String operationName) { + long currentBackoff = INITIAL_BACKOFF_MS; + int attempt = 0; + + // Endless retry + while (true) { + attempt++; + try { + operation.run(); + if (attempt > 1) { + LOGGER.info("Operation '{}' succeeded after {} attempts", operationName, attempt); + } + return; + } catch (Exception e) { + LOGGER.warn( + "Operation '{}' failed (attempt {}). Retrying in {}ms...", + operationName, + attempt, + currentBackoff, + e); + try { + Thread.sleep(currentBackoff); + } catch (InterruptedException ie) { + LOGGER.warn( + "Retry wait for operation '{}' was interrupted, stopping retries.", + operationName, + ie); + Thread.currentThread().interrupt(); + return; + } + + // Double the backoff, but cap it at the max to prevent overflow + currentBackoff = Math.min(currentBackoff * 2, MAX_BACKOFF_MS); + } + } + } + private RetryUtils() { // utility class }