From 3d37a52b6374dc1183a1739318174be55e494b15 Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Tue, 28 Oct 2025 14:36:45 +0900 Subject: [PATCH] Fix Scanner thread-safety in active transaction managers --- ...nManagedDistributedTransactionManager.java | 39 ++++++++++++++++++- ...nagedTwoPhaseCommitTransactionManager.java | 39 ++++++++++++++++++- 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java index f5cdb0bcf9..2dc86ebf42 100644 --- a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java @@ -21,10 +21,12 @@ import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.util.ActiveExpiringMap; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,6 +105,11 @@ public DistributedTransaction resume(String txId) throws TransactionNotFoundExce CoreError.TRANSACTION_NOT_FOUND.buildMessage(), txId)); } + /** + * The methods of this class are synchronized to be thread-safe because the rollback() method may + * be called from the expiration handler in a different thread while other methods are being + * executed. + */ @VisibleForTesting class ActiveTransaction extends DecoratedDistributedTransaction { @@ -124,7 +131,37 @@ public synchronized List scan(Scan scan) throws CrudException { @Override public synchronized Scanner getScanner(Scan scan) throws CrudException { - return super.getScanner(scan); + Scanner scanner = super.getScanner(scan); + return new Scanner() { + @Override + public Optional one() throws CrudException { + synchronized (ActiveTransaction.this) { + return scanner.one(); + } + } + + @Override + public List all() throws CrudException { + synchronized (ActiveTransaction.this) { + return scanner.all(); + } + } + + @Override + public void close() throws CrudException { + synchronized (ActiveTransaction.this) { + scanner.close(); + } + } + + @Nonnull + @Override + public Iterator iterator() { + synchronized (ActiveTransaction.this) { + return scanner.iterator(); + } + } + }; } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ diff --git a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java index af5d630ef2..820fba1fcc 100644 --- a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java @@ -23,10 +23,12 @@ import com.scalar.db.exception.transaction.ValidationException; import com.scalar.db.util.ActiveExpiringMap; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,6 +111,11 @@ public TwoPhaseCommitTransaction resume(String txId) throws TransactionNotFoundE CoreError.TRANSACTION_NOT_FOUND.buildMessage(), txId)); } + /** + * The methods of this class are synchronized to be thread-safe because the rollback() method may + * be called from the expiration handler in a different thread while other methods are being + * executed. + */ @VisibleForTesting class ActiveTransaction extends DecoratedTwoPhaseCommitTransaction { @@ -130,7 +137,37 @@ public synchronized List scan(Scan scan) throws CrudException { @Override public synchronized Scanner getScanner(Scan scan) throws CrudException { - return super.getScanner(scan); + Scanner scanner = super.getScanner(scan); + return new Scanner() { + @Override + public Optional one() throws CrudException { + synchronized (ActiveTransaction.this) { + return scanner.one(); + } + } + + @Override + public List all() throws CrudException { + synchronized (ActiveTransaction.this) { + return scanner.all(); + } + } + + @Override + public void close() throws CrudException { + synchronized (ActiveTransaction.this) { + scanner.close(); + } + } + + @Nonnull + @Override + public Iterator iterator() { + synchronized (ActiveTransaction.this) { + return scanner.iterator(); + } + } + }; } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */