diff --git a/core/src/main/java/com/scalar/db/common/CoreError.java b/core/src/main/java/com/scalar/db/common/CoreError.java index fa097bad68..a117bc1530 100644 --- a/core/src/main/java/com/scalar/db/common/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/CoreError.java @@ -889,6 +889,36 @@ public enum CoreError implements ScalarDbError { "Object Storage does not support the feature for altering column types", "", ""), + CONSENSUS_COMMIT_SPECIFYING_TRANSACTION_METADATA_COLUMNS_IN_PROJECTION_NOT_ALLOWED( + Category.USER_ERROR, + "0256", + "Specifying transaction metadata columns in the projection is not allowed. Table: %s; Column: %s", + "", + ""), + CONSENSUS_COMMIT_SPECIFYING_TRANSACTION_METADATA_COLUMNS_IN_ORDERING_NOT_ALLOWED( + Category.USER_ERROR, + "0257", + "Specifying transaction metadata columns in the ordering is not allowed. Table: %s; Column: %s", + "", + ""), + CONSENSUS_COMMIT_INDEX_GET_NOT_ALLOWED_IN_SERIALIZABLE( + Category.USER_ERROR, + "0258", + "Get operations using an index is not allowed in the SERIALIZABLE isolation level", + "", + ""), + CONSENSUS_COMMIT_INDEX_SCAN_NOT_ALLOWED_IN_SERIALIZABLE( + Category.USER_ERROR, + "0259", + "Scan operations using an index is not allowed in the SERIALIZABLE isolation level", + "", + ""), + CONSENSUS_COMMIT_CONDITION_ON_INDEXED_COLUMNS_NOT_ALLOWED_IN_CROSS_PARTITION_SCAN_IN_SERIALIZABLE( + Category.USER_ERROR, + "0260", + "Conditions on indexed columns in cross-partition scan operations are not allowed in the SERIALIZABLE isolation level", + "", + ""), // // Errors for the concurrency error category diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java index 3c8f96cabc..86d045b393 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java @@ -50,7 +50,7 @@ public class ConsensusCommit extends AbstractDistributedTransaction { private final TransactionContext context; private final CrudHandler crud; private final CommitHandler commit; - private final ConsensusCommitMutationOperationChecker mutationOperationChecker; + private final ConsensusCommitOperationChecker operationChecker; @Nullable private final CoordinatorGroupCommitter groupCommitter; @SuppressFBWarnings("EI_EXPOSE_REP2") @@ -58,12 +58,12 @@ public ConsensusCommit( TransactionContext context, CrudHandler crud, CommitHandler commit, - ConsensusCommitMutationOperationChecker mutationOperationChecker, + ConsensusCommitOperationChecker operationChecker, @Nullable CoordinatorGroupCommitter groupCommitter) { this.context = checkNotNull(context); this.crud = checkNotNull(crud); this.commit = checkNotNull(commit); - this.mutationOperationChecker = mutationOperationChecker; + this.operationChecker = operationChecker; this.groupCommitter = groupCommitter; } @@ -74,17 +74,40 @@ public String getId() { @Override public Optional get(Get get) throws CrudException { - return crud.get(copyAndSetTargetToIfNot(get), context); + get = copyAndSetTargetToIfNot(get); + + try { + operationChecker.check(get, context); + } catch (ExecutionException e) { + throw new CrudException(e.getMessage(), e, getId()); + } + + return crud.get(get, context); } @Override public List scan(Scan scan) throws CrudException { - return crud.scan(copyAndSetTargetToIfNot(scan), context); + scan = copyAndSetTargetToIfNot(scan); + + try { + operationChecker.check(scan, context); + } catch (ExecutionException e) { + throw new CrudException(e.getMessage(), e, getId()); + } + + return crud.scan(scan, context); } @Override public Scanner getScanner(Scan scan) throws CrudException { scan = copyAndSetTargetToIfNot(scan); + + try { + operationChecker.check(scan, context); + } catch (ExecutionException e) { + throw new CrudException(e.getMessage(), e, getId()); + } + return crud.getScanner(scan, context); } @@ -230,7 +253,7 @@ void waitForRecoveryCompletion() throws CrudException { private void checkMutation(Mutation mutation) throws CrudException { try { - mutationOperationChecker.check(mutation); + operationChecker.check(mutation); } catch (ExecutionException e) { throw new CrudException(e.getMessage(), e, getId()); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java index 731afcabe8..c04facb622 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java @@ -58,7 +58,7 @@ public class ConsensusCommitManager extends AbstractDistributedTransactionManage private final CrudHandler crud; protected final CommitHandler commit; private final Isolation isolation; - private final ConsensusCommitMutationOperationChecker mutationOperationChecker; + private final ConsensusCommitOperationChecker operationChecker; @Nullable private final CoordinatorGroupCommitter groupCommitter; @SuppressFBWarnings("EI_EXPOSE_REP2") @@ -86,7 +86,9 @@ public ConsensusCommitManager( parallelExecutor); commit = createCommitHandler(config); isolation = config.getIsolation(); - mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); + operationChecker = + new ConsensusCommitOperationChecker( + tableMetadataManager, config.isIncludeMetadataEnabled()); } protected ConsensusCommitManager(DatabaseConfig databaseConfig) { @@ -113,7 +115,9 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) { parallelExecutor); commit = createCommitHandler(config); isolation = config.getIsolation(); - mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); + operationChecker = + new ConsensusCommitOperationChecker( + tableMetadataManager, config.isIncludeMetadataEnabled()); } @SuppressFBWarnings("EI_EXPOSE_REP2") @@ -121,6 +125,7 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) { ConsensusCommitManager( DistributedStorage storage, DistributedStorageAdmin admin, + ConsensusCommitConfig config, DatabaseConfig databaseConfig, Coordinator coordinator, ParallelExecutor parallelExecutor, @@ -142,8 +147,9 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) { this.commit = commit; this.groupCommitter = groupCommitter; this.isolation = isolation; - this.mutationOperationChecker = - new ConsensusCommitMutationOperationChecker(tableMetadataManager); + this.operationChecker = + new ConsensusCommitOperationChecker( + tableMetadataManager, config.isIncludeMetadataEnabled()); } // `groupCommitter` must be set before calling this method. @@ -261,7 +267,7 @@ DistributedTransaction begin( TransactionContext context = new TransactionContext(txId, snapshot, isolation, readOnly, oneOperation); DistributedTransaction transaction = - new ConsensusCommit(context, crud, commit, mutationOperationChecker, groupCommitter); + new ConsensusCommit(context, crud, commit, operationChecker, groupCommitter); if (readOnly) { transaction = new ReadOnlyDistributedTransaction(transaction); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitMutationOperationChecker.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitMutationOperationChecker.java deleted file mode 100644 index 36c4ea2dda..0000000000 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitMutationOperationChecker.java +++ /dev/null @@ -1,111 +0,0 @@ -package com.scalar.db.transaction.consensuscommit; - -import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.*; - -import com.google.common.annotations.VisibleForTesting; -import com.scalar.db.api.ConditionalExpression; -import com.scalar.db.api.Delete; -import com.scalar.db.api.DeleteIf; -import com.scalar.db.api.DeleteIfExists; -import com.scalar.db.api.Mutation; -import com.scalar.db.api.MutationCondition; -import com.scalar.db.api.Put; -import com.scalar.db.api.PutIf; -import com.scalar.db.api.PutIfExists; -import com.scalar.db.api.PutIfNotExists; -import com.scalar.db.api.TableMetadata; -import com.scalar.db.common.CoreError; -import com.scalar.db.common.checker.ConditionChecker; -import com.scalar.db.exception.storage.ExecutionException; -import javax.annotation.concurrent.ThreadSafe; - -@ThreadSafe -public class ConsensusCommitMutationOperationChecker { - - private final TransactionTableMetadataManager transactionTableMetadataManager; - - public ConsensusCommitMutationOperationChecker( - TransactionTableMetadataManager transactionTableMetadataManager) { - this.transactionTableMetadataManager = transactionTableMetadataManager; - } - - /** - * Checks the mutation validity - * - * @param mutation a mutation operation - * @throws ExecutionException when retrieving the table metadata fails - * @throws IllegalArgumentException when the mutation is invalid - */ - public void check(Mutation mutation) throws ExecutionException { - if (mutation instanceof Put) { - check((Put) mutation); - } else { - assert mutation instanceof Delete; - check((Delete) mutation); - } - } - - private void check(Put put) throws ExecutionException { - TransactionTableMetadata metadata = - getTransactionTableMetadata(transactionTableMetadataManager, put); - for (String column : put.getContainedColumnNames()) { - if (metadata.getTransactionMetaColumnNames().contains(column)) { - throw new IllegalArgumentException( - CoreError.CONSENSUS_COMMIT_MUTATING_TRANSACTION_METADATA_COLUMNS_NOT_ALLOWED - .buildMessage(put.forFullTableName().get(), column)); - } - } - - if (!put.getCondition().isPresent()) { - return; - } - MutationCondition condition = put.getCondition().get(); - - if (!(condition instanceof PutIf - || condition instanceof PutIfNotExists - || condition instanceof PutIfExists)) { - throw new IllegalArgumentException( - CoreError.CONSENSUS_COMMIT_CONDITION_NOT_ALLOWED_ON_PUT.buildMessage( - condition.getClass().getSimpleName())); - } - checkConditionIsNotTargetingMetadataColumns(condition, metadata); - ConditionChecker conditionChecker = createConditionChecker(metadata.getTableMetadata()); - conditionChecker.check(condition, true); - } - - private void check(Delete delete) throws ExecutionException { - if (!delete.getCondition().isPresent()) { - return; - } - MutationCondition condition = delete.getCondition().get(); - - if (!(condition instanceof DeleteIf || condition instanceof DeleteIfExists)) { - throw new IllegalArgumentException( - CoreError.CONSENSUS_COMMIT_CONDITION_NOT_ALLOWED_ON_DELETE.buildMessage( - condition.getClass().getSimpleName())); - } - TransactionTableMetadata transactionMetadata = - getTransactionTableMetadata(transactionTableMetadataManager, delete); - checkConditionIsNotTargetingMetadataColumns(condition, transactionMetadata); - ConditionChecker conditionChecker = - createConditionChecker(transactionMetadata.getTableMetadata()); - conditionChecker.check(condition, false); - } - - private void checkConditionIsNotTargetingMetadataColumns( - MutationCondition mutationCondition, TransactionTableMetadata metadata) { - for (ConditionalExpression expression : mutationCondition.getExpressions()) { - String column = expression.getColumn().getName(); - if (metadata.getTransactionMetaColumnNames().contains(column)) { - throw new IllegalArgumentException( - CoreError.CONSENSUS_COMMIT_CONDITION_NOT_ALLOWED_TO_TARGET_TRANSACTION_METADATA_COLUMNS - .buildMessage(column)); - } - } - } - - @VisibleForTesting - ConditionChecker createConditionChecker(TableMetadata tableMetadata) { - return new ConditionChecker(tableMetadata); - } -} diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitOperationChecker.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitOperationChecker.java new file mode 100644 index 0000000000..ad551c9e66 --- /dev/null +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitOperationChecker.java @@ -0,0 +1,253 @@ +package com.scalar.db.transaction.consensuscommit; + +import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.*; + +import com.google.common.annotations.VisibleForTesting; +import com.scalar.db.api.ConditionalExpression; +import com.scalar.db.api.Delete; +import com.scalar.db.api.DeleteIf; +import com.scalar.db.api.DeleteIfExists; +import com.scalar.db.api.Get; +import com.scalar.db.api.Mutation; +import com.scalar.db.api.MutationCondition; +import com.scalar.db.api.Put; +import com.scalar.db.api.PutIf; +import com.scalar.db.api.PutIfExists; +import com.scalar.db.api.PutIfNotExists; +import com.scalar.db.api.Scan; +import com.scalar.db.api.ScanAll; +import com.scalar.db.api.Selection; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.CoreError; +import com.scalar.db.common.checker.ConditionChecker; +import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.util.ScalarDbUtils; +import javax.annotation.concurrent.ThreadSafe; + +@ThreadSafe +public class ConsensusCommitOperationChecker { + + private final TransactionTableMetadataManager transactionTableMetadataManager; + private final boolean isIncludeMetadataEnabled; + + public ConsensusCommitOperationChecker( + TransactionTableMetadataManager transactionTableMetadataManager, + boolean isIncludeMetadataEnabled) { + this.transactionTableMetadataManager = transactionTableMetadataManager; + this.isIncludeMetadataEnabled = isIncludeMetadataEnabled; + } + + /** + * Checks the get validity + * + * @param get a get operation + * @param context a transaction context + * @throws ExecutionException when retrieving the table metadata fails + * @throws IllegalArgumentException when the get is invalid + */ + public void check(Get get, TransactionContext context) throws ExecutionException { + TransactionTableMetadata metadata = + getTransactionTableMetadata(transactionTableMetadataManager, get); + + // Skip checks if including metadata columns is enabled + if (!isIncludeMetadataEnabled) { + // Check projections + for (String column : get.getProjections()) { + if (metadata.getTransactionMetaColumnNames().contains(column)) { + throw new IllegalArgumentException( + CoreError + .CONSENSUS_COMMIT_SPECIFYING_TRANSACTION_METADATA_COLUMNS_IN_PROJECTION_NOT_ALLOWED + .buildMessage(get.forFullTableName().get(), column)); + } + } + + // Check conditions + for (Selection.Conjunction conjunction : get.getConjunctions()) { + for (ConditionalExpression condition : conjunction.getConditions()) { + String column = condition.getColumn().getName(); + if (metadata.getTransactionMetaColumnNames().contains(column)) { + throw new IllegalArgumentException( + CoreError + .CONSENSUS_COMMIT_CONDITION_NOT_ALLOWED_TO_TARGET_TRANSACTION_METADATA_COLUMNS + .buildMessage(get.forFullTableName().get(), column)); + } + } + } + } + + // Additional checks for SERIALIZABLE isolation level + if (context.isolation == Isolation.SERIALIZABLE) { + // Don't allow index gets + TableMetadata tableMetadata = metadata.getTableMetadata(); + if (ScalarDbUtils.isSecondaryIndexSpecified(get, tableMetadata)) { + // If the index column is part of the primary key, it's allowed + String indexKeyColumnName = get.getPartitionKey().getColumns().get(0).getName(); + if (!tableMetadata.getPartitionKeyNames().contains(indexKeyColumnName) + && !tableMetadata.getClusteringKeyNames().contains(indexKeyColumnName)) { + throw new IllegalArgumentException( + CoreError.CONSENSUS_COMMIT_INDEX_GET_NOT_ALLOWED_IN_SERIALIZABLE.buildMessage()); + } + } + } + } + + /** + * Checks the scan validity + * + * @param scan a scan operation + * @param context a transaction context + * @throws ExecutionException when retrieving the table metadata fails + * @throws IllegalArgumentException when the scan is invalid + */ + public void check(Scan scan, TransactionContext context) throws ExecutionException { + TransactionTableMetadata metadata = + getTransactionTableMetadata(transactionTableMetadataManager, scan); + + // Skip checks if including metadata columns is enabled + if (!isIncludeMetadataEnabled) { + // Check projections + for (String column : scan.getProjections()) { + if (metadata.getTransactionMetaColumnNames().contains(column)) { + throw new IllegalArgumentException( + CoreError + .CONSENSUS_COMMIT_SPECIFYING_TRANSACTION_METADATA_COLUMNS_IN_PROJECTION_NOT_ALLOWED + .buildMessage(scan.forFullTableName().get(), column)); + } + } + + // Check conditions + for (Selection.Conjunction conjunction : scan.getConjunctions()) { + for (ConditionalExpression condition : conjunction.getConditions()) { + String column = condition.getColumn().getName(); + if (metadata.getTransactionMetaColumnNames().contains(column)) { + throw new IllegalArgumentException( + CoreError + .CONSENSUS_COMMIT_CONDITION_NOT_ALLOWED_TO_TARGET_TRANSACTION_METADATA_COLUMNS + .buildMessage(scan.forFullTableName().get(), column)); + } + } + } + + // Check orderings + for (Scan.Ordering ordering : scan.getOrderings()) { + String column = ordering.getColumnName(); + if (metadata.getTransactionMetaColumnNames().contains(column)) { + throw new IllegalArgumentException( + CoreError + .CONSENSUS_COMMIT_SPECIFYING_TRANSACTION_METADATA_COLUMNS_IN_ORDERING_NOT_ALLOWED + .buildMessage(scan.forFullTableName().get(), column)); + } + } + } + + // Additional checks for SERIALIZABLE isolation level + if (context.isolation == Isolation.SERIALIZABLE) { + // Don't allow index scans + TableMetadata tableMetadata = metadata.getTableMetadata(); + if (ScalarDbUtils.isSecondaryIndexSpecified(scan, tableMetadata)) { + // If the index column is part of the primary key, it's allowed + String indexKeyColumnName = scan.getPartitionKey().getColumns().get(0).getName(); + if (!tableMetadata.getPartitionKeyNames().contains(indexKeyColumnName) + && !tableMetadata.getClusteringKeyNames().contains(indexKeyColumnName)) { + throw new IllegalArgumentException( + CoreError.CONSENSUS_COMMIT_INDEX_SCAN_NOT_ALLOWED_IN_SERIALIZABLE.buildMessage()); + } + } + + // If the scan is a cross-partition scan (ScanAll), don't allow conditions on indexed columns + if (scan instanceof ScanAll) { + for (Selection.Conjunction conjunction : scan.getConjunctions()) { + for (ConditionalExpression condition : conjunction.getConditions()) { + String column = condition.getColumn().getName(); + if (metadata.getSecondaryIndexNames().contains(column)) { + throw new IllegalArgumentException( + CoreError + .CONSENSUS_COMMIT_CONDITION_ON_INDEXED_COLUMNS_NOT_ALLOWED_IN_CROSS_PARTITION_SCAN_IN_SERIALIZABLE + .buildMessage()); + } + } + } + } + } + } + + /** + * Checks the mutation validity + * + * @param mutation a mutation operation + * @throws ExecutionException when retrieving the table metadata fails + * @throws IllegalArgumentException when the mutation is invalid + */ + public void check(Mutation mutation) throws ExecutionException { + if (mutation instanceof Put) { + check((Put) mutation); + } else { + assert mutation instanceof Delete; + check((Delete) mutation); + } + } + + private void check(Put put) throws ExecutionException { + TransactionTableMetadata metadata = + getTransactionTableMetadata(transactionTableMetadataManager, put); + for (String column : put.getContainedColumnNames()) { + if (metadata.getTransactionMetaColumnNames().contains(column)) { + throw new IllegalArgumentException( + CoreError.CONSENSUS_COMMIT_MUTATING_TRANSACTION_METADATA_COLUMNS_NOT_ALLOWED + .buildMessage(put.forFullTableName().get(), column)); + } + } + + if (!put.getCondition().isPresent()) { + return; + } + MutationCondition condition = put.getCondition().get(); + + if (!(condition instanceof PutIf + || condition instanceof PutIfNotExists + || condition instanceof PutIfExists)) { + throw new IllegalArgumentException( + CoreError.CONSENSUS_COMMIT_CONDITION_NOT_ALLOWED_ON_PUT.buildMessage( + condition.getClass().getSimpleName())); + } + checkConditionIsNotTargetingMetadataColumns(put, condition, metadata); + ConditionChecker conditionChecker = createConditionChecker(metadata.getTableMetadata()); + conditionChecker.check(condition, true); + } + + private void check(Delete delete) throws ExecutionException { + if (!delete.getCondition().isPresent()) { + return; + } + MutationCondition condition = delete.getCondition().get(); + + if (!(condition instanceof DeleteIf || condition instanceof DeleteIfExists)) { + throw new IllegalArgumentException( + CoreError.CONSENSUS_COMMIT_CONDITION_NOT_ALLOWED_ON_DELETE.buildMessage( + condition.getClass().getSimpleName())); + } + TransactionTableMetadata transactionMetadata = + getTransactionTableMetadata(transactionTableMetadataManager, delete); + checkConditionIsNotTargetingMetadataColumns(delete, condition, transactionMetadata); + ConditionChecker conditionChecker = + createConditionChecker(transactionMetadata.getTableMetadata()); + conditionChecker.check(condition, false); + } + + private void checkConditionIsNotTargetingMetadataColumns( + Mutation mutation, MutationCondition mutationCondition, TransactionTableMetadata metadata) { + for (ConditionalExpression expression : mutationCondition.getExpressions()) { + String column = expression.getColumn().getName(); + if (metadata.getTransactionMetaColumnNames().contains(column)) { + throw new IllegalArgumentException( + CoreError.CONSENSUS_COMMIT_CONDITION_NOT_ALLOWED_TO_TARGET_TRANSACTION_METADATA_COLUMNS + .buildMessage(mutation.forFullTableName().get(), column)); + } + } + } + + @VisibleForTesting + ConditionChecker createConditionChecker(TableMetadata tableMetadata) { + return new ConditionChecker(tableMetadata); + } +} diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitUtils.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitUtils.java index 6ad6f9f9fc..f83d627c2d 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitUtils.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitUtils.java @@ -15,6 +15,7 @@ import com.scalar.db.api.Put; import com.scalar.db.api.PutBuilder; import com.scalar.db.api.Scan; +import com.scalar.db.api.ScanAll; import com.scalar.db.api.ScanBuilder; import com.scalar.db.api.Selection; import com.scalar.db.api.TableMetadata; @@ -370,7 +371,7 @@ static Get prepareGetForStorage(Get get, TableMetadata metadata) { if (!get.getConjunctions().isEmpty()) { // If there are conjunctions, we need to convert them to include conditions on the before // image - Set converted = convertConjunctions(get.getConjunctions(), metadata); + Set converted = convertConjunctions(get.getConjunctions(), metadata, true); return builder.clearConditions().whereOr(converted).build(); } @@ -390,7 +391,13 @@ static Scan prepareScanForStorage(Scan scan, TableMetadata metadata) { if (!scan.getConjunctions().isEmpty()) { // If there are conjunctions, we need to convert them to include conditions on the before // image - Set converted = convertConjunctions(scan.getConjunctions(), metadata); + + // If the scan is a cross-partition scan (ScanAll), we don't convert conditions on indexed + // columns + boolean convertIndexedColumns = !(scan instanceof ScanAll); + + Set converted = + convertConjunctions(scan.getConjunctions(), metadata, convertIndexedColumns); return builder.clearConditions().whereOr(converted).build(); } @@ -460,10 +467,13 @@ static Scan prepareScanForStorage(Scan scan, TableMetadata metadata) { * * @param conjunctions the conjunctions to convert * @param metadata the table metadata of the target table + * @param convertIndexedColumns whether to convert conditions on indexed columns * @return the converted conjunctions */ private static Set convertConjunctions( - Set conjunctions, TableMetadata metadata) { + Set conjunctions, + TableMetadata metadata, + boolean convertIndexedColumns) { Set converted = new HashSet<>(conjunctions.size() * 2); // Keep the original conjunctions @@ -483,6 +493,11 @@ private static Set convertConjunctions( continue; } + if (!convertIndexedColumns && metadata.getSecondaryIndexNames().contains(columnName)) { + conditions.add(condition); + continue; + } + // Convert the condition to use the before image column ConditionalExpression convertedCondition; if (condition instanceof LikeExpression) { diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index df6e688be0..d67349bfc6 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -371,7 +371,7 @@ public void put(Put put, TransactionContext context) throws CrudException { read(key, createGet(key), context, metadata); } mutationConditionsValidator.checkIfConditionIsSatisfied( - put, context.snapshot.getResult(key).orElse(null), context); + put, context.snapshot.getResult(key).orElse(null), context.transactionId); } context.snapshot.putIntoWriteSet(key, put); @@ -386,7 +386,7 @@ public void delete(Delete delete, TransactionContext context) throws CrudExcepti read(key, createGet(key), context, metadata); } mutationConditionsValidator.checkIfConditionIsSatisfied( - delete, context.snapshot.getResult(key).orElse(null), context); + delete, context.snapshot.getResult(key).orElse(null), context.transactionId); } context.snapshot.putIntoDeleteSet(key, delete); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationConditionsValidator.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationConditionsValidator.java index 5e9b4c5767..862f3ea0d0 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationConditionsValidator.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationConditionsValidator.java @@ -31,28 +31,28 @@ public class MutationConditionsValidator { * * @param put a Put operation * @param existingRecord the current value of the record targeted by the mutation, if any - * @param context the transaction context + * @param transactionId the transaction ID * @throws UnsatisfiedConditionException if the condition is not satisfied */ public void checkIfConditionIsSatisfied( - Put put, @Nullable TransactionResult existingRecord, TransactionContext context) + Put put, @Nullable TransactionResult existingRecord, String transactionId) throws UnsatisfiedConditionException { assert put.getCondition().isPresent(); MutationCondition condition = put.getCondition().get(); boolean recordExists = existingRecord != null; if (condition instanceof PutIf) { if (recordExists) { - validateConditionalExpressions(condition.getExpressions(), existingRecord, context); + validateConditionalExpressions(condition.getExpressions(), existingRecord, transactionId); } else { - throwWhenRecordDoesNotExist(condition, context); + throwWhenRecordDoesNotExist(condition, transactionId); } } else if (condition instanceof PutIfExists) { if (!recordExists) { - throwWhenRecordDoesNotExist(condition, context); + throwWhenRecordDoesNotExist(condition, transactionId); } } else if (condition instanceof PutIfNotExists) { if (recordExists) { - throwWhenRecordExists(condition, context); + throwWhenRecordExists(condition, transactionId); } } else { throw new AssertionError(); @@ -65,50 +65,50 @@ public void checkIfConditionIsSatisfied( * * @param delete a Delete operation * @param existingRecord the current value of the record targeted by the mutation, if any - * @param context the transaction context + * @param transactionId the transaction ID * @throws UnsatisfiedConditionException if the condition is not satisfied */ public void checkIfConditionIsSatisfied( - Delete delete, @Nullable TransactionResult existingRecord, TransactionContext context) + Delete delete, @Nullable TransactionResult existingRecord, String transactionId) throws UnsatisfiedConditionException { assert delete.getCondition().isPresent(); MutationCondition condition = delete.getCondition().get(); boolean recordExists = existingRecord != null; if (condition instanceof DeleteIf) { if (recordExists) { - validateConditionalExpressions(condition.getExpressions(), existingRecord, context); + validateConditionalExpressions(condition.getExpressions(), existingRecord, transactionId); } else { - throwWhenRecordDoesNotExist(condition, context); + throwWhenRecordDoesNotExist(condition, transactionId); } } else if (condition instanceof DeleteIfExists) { if (!recordExists) { - throwWhenRecordDoesNotExist(condition, context); + throwWhenRecordDoesNotExist(condition, transactionId); } } else { throw new AssertionError(); } } - private void throwWhenRecordDoesNotExist(MutationCondition condition, TransactionContext context) + private void throwWhenRecordDoesNotExist(MutationCondition condition, String transactionId) throws UnsatisfiedConditionException { throw new UnsatisfiedConditionException( CoreError.CONSENSUS_COMMIT_CONDITION_NOT_SATISFIED_BECAUSE_RECORD_NOT_EXISTS.buildMessage( condition.getClass().getSimpleName()), - context.transactionId); + transactionId); } - private void throwWhenRecordExists(MutationCondition condition, TransactionContext context) + private void throwWhenRecordExists(MutationCondition condition, String transactionId) throws UnsatisfiedConditionException { throw new UnsatisfiedConditionException( CoreError.CONSENSUS_COMMIT_CONDITION_NOT_SATISFIED_BECAUSE_RECORD_EXISTS.buildMessage( condition.getClass().getSimpleName()), - context.transactionId); + transactionId); } private void validateConditionalExpressions( List conditionalExpressions, TransactionResult existingRecord, - TransactionContext context) + String transactionId) throws UnsatisfiedConditionException { for (ConditionalExpression conditionalExpression : conditionalExpressions) { if (!shouldMutate( @@ -118,7 +118,7 @@ private void validateConditionalExpressions( throw new UnsatisfiedConditionException( CoreError.CONSENSUS_COMMIT_CONDITION_NOT_SATISFIED.buildMessage( conditionalExpression.getColumn().getName()), - context.transactionId); + transactionId); } } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java index f1a6b10625..4b6b4626c5 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java @@ -39,7 +39,7 @@ public class TwoPhaseConsensusCommit extends AbstractTwoPhaseCommitTransaction { private final TransactionContext context; private final CrudHandler crud; private final CommitHandler commit; - private final ConsensusCommitMutationOperationChecker mutationOperationChecker; + private final ConsensusCommitOperationChecker operationChecker; private boolean validated; private boolean needRollback; @@ -48,11 +48,11 @@ public TwoPhaseConsensusCommit( TransactionContext context, CrudHandler crud, CommitHandler commit, - ConsensusCommitMutationOperationChecker mutationOperationChecker) { + ConsensusCommitOperationChecker operationChecker) { this.context = context; this.crud = crud; this.commit = commit; - this.mutationOperationChecker = mutationOperationChecker; + this.operationChecker = operationChecker; } @Override @@ -62,17 +62,41 @@ public String getId() { @Override public Optional get(Get get) throws CrudException { - return crud.get(copyAndSetTargetToIfNot(get), context); + get = copyAndSetTargetToIfNot(get); + + try { + operationChecker.check(get, context); + } catch (ExecutionException e) { + throw new CrudException(e.getMessage(), e, getId()); + } + + return crud.get(get, context); } @Override public List scan(Scan scan) throws CrudException { - return crud.scan(copyAndSetTargetToIfNot(scan), context); + scan = copyAndSetTargetToIfNot(scan); + + try { + operationChecker.check(scan, context); + } catch (ExecutionException e) { + throw new CrudException(e.getMessage(), e, getId()); + } + + return crud.scan(scan, context); } @Override public Scanner getScanner(Scan scan) throws CrudException { - return crud.getScanner(copyAndSetTargetToIfNot(scan), context); + scan = copyAndSetTargetToIfNot(scan); + + try { + operationChecker.check(scan, context); + } catch (ExecutionException e) { + throw new CrudException(e.getMessage(), e, getId()); + } + + return crud.getScanner(scan, context); } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @@ -270,7 +294,7 @@ void waitForRecoveryCompletion() throws CrudException { private void checkMutation(Mutation mutation) throws CrudException { try { - mutationOperationChecker.check(mutation); + operationChecker.check(mutation); } catch (ExecutionException e) { throw new CrudException(e.getMessage(), e, getId()); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java index 4a9b7c034a..9f6058258b 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java @@ -61,7 +61,7 @@ public class TwoPhaseConsensusCommitManager extends AbstractTwoPhaseCommitTransa private final RecoveryExecutor recoveryExecutor; private final CrudHandler crud; private final CommitHandler commit; - private final ConsensusCommitMutationOperationChecker mutationOperationChecker; + private final ConsensusCommitOperationChecker operationChecker; @SuppressFBWarnings("EI_EXPOSE_REP2") @Inject @@ -94,7 +94,9 @@ public TwoPhaseConsensusCommitManager( new MutationsGrouper(new StorageInfoProvider(admin)), config.isCoordinatorWriteOmissionOnReadOnlyEnabled(), config.isOnePhaseCommitEnabled()); - mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); + operationChecker = + new ConsensusCommitOperationChecker( + tableMetadataManager, config.isIncludeMetadataEnabled()); } public TwoPhaseConsensusCommitManager(DatabaseConfig databaseConfig) { @@ -126,7 +128,9 @@ public TwoPhaseConsensusCommitManager(DatabaseConfig databaseConfig) { new MutationsGrouper(new StorageInfoProvider(admin)), config.isCoordinatorWriteOmissionOnReadOnlyEnabled(), config.isOnePhaseCommitEnabled()); - mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); + operationChecker = + new ConsensusCommitOperationChecker( + tableMetadataManager, config.isIncludeMetadataEnabled()); } @SuppressFBWarnings("EI_EXPOSE_REP2") @@ -153,7 +157,9 @@ public TwoPhaseConsensusCommitManager(DatabaseConfig databaseConfig) { this.recoveryExecutor = recoveryExecutor; this.crud = crud; this.commit = commit; - mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); + operationChecker = + new ConsensusCommitOperationChecker( + tableMetadataManager, config.isIncludeMetadataEnabled()); } private void throwIfGroupCommitIsEnabled() { @@ -206,7 +212,7 @@ TwoPhaseCommitTransaction begin( TransactionContext context = new TransactionContext(txId, snapshot, isolation, readOnly, oneOperation); TwoPhaseConsensusCommit transaction = - new TwoPhaseConsensusCommit(context, crud, commit, mutationOperationChecker); + new TwoPhaseConsensusCommit(context, crud, commit, operationChecker); getNamespace().ifPresent(transaction::withNamespace); getTable().ifPresent(transaction::withTable); return transaction; diff --git a/core/src/test/java/com/scalar/db/storage/jdbc/JdbcOperationCheckerTest.java b/core/src/test/java/com/scalar/db/storage/jdbc/JdbcOperationCheckerTest.java index 716014ddae..e5c18d203c 100644 --- a/core/src/test/java/com/scalar/db/storage/jdbc/JdbcOperationCheckerTest.java +++ b/core/src/test/java/com/scalar/db/storage/jdbc/JdbcOperationCheckerTest.java @@ -7,7 +7,6 @@ import static org.mockito.Mockito.when; import com.google.common.collect.Sets; -import com.scalar.db.api.Scan; import com.scalar.db.api.ScanAll; import com.scalar.db.api.Selection; import com.scalar.db.api.Selection.Conjunction; @@ -30,7 +29,6 @@ public class JdbcOperationCheckerTest { @Mock private StorageInfoProvider storageInfoProvider; @Mock private RdbEngineStrategy rdbEngine; @Mock private ScanAll scanAll; - @Mock private Scan scan; @Mock private Selection selection; @Mock private TableMetadata tableMetadata; private JdbcOperationChecker operationChecker; diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java index 5690ca5add..185df8380a 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java @@ -60,6 +60,7 @@ public class ConsensusCommitManagerTest { @Mock private DistributedStorage storage; @Mock private DistributedStorageAdmin admin; + @Mock private ConsensusCommitConfig consensusCommitConfig; @Mock private DatabaseConfig databaseConfig; @Mock private Coordinator coordinator; @Mock private ParallelExecutor parallelExecutor; @@ -77,6 +78,7 @@ public void setUp() throws Exception { new ConsensusCommitManager( storage, admin, + consensusCommitConfig, databaseConfig, coordinator, parallelExecutor, @@ -131,6 +133,7 @@ public void begin_TxIdGiven_ReturnWithSpecifiedTxIdAndSnapshotIsolation() { new ConsensusCommitManager( storage, admin, + consensusCommitConfig, databaseConfig, coordinator, parallelExecutor, @@ -164,6 +167,7 @@ public void begin_TxIdGiven_ReturnWithSpecifiedTxIdAndSnapshotIsolation() { new ConsensusCommitManager( storage, admin, + consensusCommitConfig, databaseConfig, coordinator, parallelExecutor, diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitMutationOperationCheckerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitMutationOperationCheckerTest.java deleted file mode 100644 index 330703f0b5..0000000000 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitMutationOperationCheckerTest.java +++ /dev/null @@ -1,203 +0,0 @@ -package com.scalar.db.transaction.consensuscommit; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.common.collect.ImmutableSet; -import com.scalar.db.api.ConditionBuilder; -import com.scalar.db.api.Delete; -import com.scalar.db.api.DeleteIf; -import com.scalar.db.api.DeleteIfExists; -import com.scalar.db.api.MutationCondition; -import com.scalar.db.api.Put; -import com.scalar.db.api.PutIf; -import com.scalar.db.api.PutIfExists; -import com.scalar.db.api.PutIfNotExists; -import com.scalar.db.common.checker.ConditionChecker; -import com.scalar.db.exception.storage.ExecutionException; -import java.util.LinkedHashSet; -import java.util.Optional; -import java.util.Set; -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -public class ConsensusCommitMutationOperationCheckerTest { - private static final String ANY_COL_1 = "any_col_1"; - private static final String ANY_COL_2 = "any_col_2"; - private static final String ANY_METADATA_COL_1 = "any_metadata_col_1"; - private static final String ANY_METADATA_COL_2 = "any_metadata_col_2"; - @Mock private TransactionTableMetadataManager metadataManager; - @Mock private Put put; - @Mock private Delete delete; - @Mock private TransactionTableMetadata tableMetadata; - @Mock private ConditionChecker conditionChecker; - private ConsensusCommitMutationOperationChecker checker; - - @BeforeEach - public void setUp() throws Exception { - MockitoAnnotations.openMocks(this).close(); - checker = spy(new ConsensusCommitMutationOperationChecker(metadataManager)); - when(checker.createConditionChecker(any())).thenReturn(conditionChecker); - when(metadataManager.getTransactionTableMetadata(any())).thenReturn(tableMetadata); - LinkedHashSet metadataColumns = new LinkedHashSet<>(); - metadataColumns.add(ANY_METADATA_COL_1); - metadataColumns.add(ANY_METADATA_COL_2); - when(tableMetadata.getTransactionMetaColumnNames()).thenReturn(metadataColumns); - } - - @ParameterizedTest - @ValueSource(classes = {DeleteIf.class, DeleteIfExists.class}) - public void checkForPut_WithNonAllowedCondition_ShouldThrowIllegalArgumentException( - Class deleteConditonClass) { - // Arrange - when(put.getCondition()).thenReturn(Optional.of(mock(deleteConditonClass))); - - // Act Assert - Assertions.assertThatThrownBy(() -> checker.check(put)) - .isInstanceOf(IllegalArgumentException.class); - } - - @ParameterizedTest - @ValueSource(classes = {PutIf.class, PutIfExists.class, PutIfNotExists.class}) - public void checkForPut_WithAllowedCondition_ShouldCallConditionChecker( - Class putConditionClass) { - // Arrange - MutationCondition condition = mock(putConditionClass); - when(put.getCondition()).thenReturn(Optional.of(condition)); - - // Act Assert - org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> checker.check(put)); - verify(conditionChecker).check(condition, true); - } - - @Test - public void checkForPut_ThatMutatesMetadataColumns_ShouldThrowIllegalArgumentException() - throws ExecutionException { - // Arrange - String fullTableName = "ns.tbl"; - Set columns = ImmutableSet.of(ANY_COL_1, ANY_METADATA_COL_1, ANY_COL_2); - when(put.forFullTableName()).thenReturn(Optional.of(fullTableName)); - when(put.getContainedColumnNames()).thenReturn(columns); - - // Act Assert - Assertions.assertThatThrownBy(() -> checker.check(put)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining(fullTableName) - .hasMessageContaining(ANY_METADATA_COL_1); - verify(metadataManager).getTransactionTableMetadata(put); - } - - @Test - public void checkForPut_ThatDoNotMutateMetadataColumns_ShouldDoNothing() - throws ExecutionException { - // Arrange - Set columns = ImmutableSet.of(ANY_COL_1, ANY_COL_2); - when(put.getContainedColumnNames()).thenReturn(columns); - - // Act Assert - org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> checker.check(put)); - verify(metadataManager).getTransactionTableMetadata(put); - } - - @Test - public void - checkForPut_WithConditionThatTargetMetadataColumns_ShouldThrowIllegalArgumentException() - throws ExecutionException { - // Arrange - MutationCondition condition = - ConditionBuilder.putIf(ConditionBuilder.column(ANY_COL_1).isNullInt()) - .and(ConditionBuilder.column(ANY_METADATA_COL_1).isNullText()) - .build(); - when(put.getCondition()).thenReturn(Optional.of(condition)); - - // Act Assert - Assertions.assertThatThrownBy(() -> checker.check(put)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining(ANY_METADATA_COL_1); - verify(metadataManager).getTransactionTableMetadata(put); - } - - @Test - public void checkForPut_WithConditionThatDoNotTargetMetadataColumns_ShouldCallConditionChecker() - throws ExecutionException { - // Arrange - MutationCondition condition = - ConditionBuilder.putIf(ConditionBuilder.column(ANY_COL_1).isNullInt()) - .and(ConditionBuilder.column(ANY_COL_2).isNullText()) - .build(); - when(put.getCondition()).thenReturn(Optional.of(condition)); - - // Act Assert - org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> checker.check(put)); - verify(metadataManager).getTransactionTableMetadata(put); - verify(conditionChecker).check(condition, true); - } - - @ParameterizedTest - @ValueSource(classes = {PutIf.class, PutIfExists.class, PutIfNotExists.class}) - public void checkForDelete_WithNonAllowedCondition_ShouldThrowIllegalArgumentException( - Class putConditionClass) { - // Arrange - when(delete.getCondition()).thenReturn(Optional.of(mock(putConditionClass))); - - // Act Assert - Assertions.assertThatThrownBy(() -> checker.check(delete)) - .isInstanceOf(IllegalArgumentException.class); - } - - @ParameterizedTest - @ValueSource(classes = {DeleteIf.class, DeleteIfExists.class}) - public void checkForDelete_WithAllowedCondition_ShouldCheckCondition( - Class deleteConditionClass) { - // Arrange - MutationCondition condition = mock(deleteConditionClass); - when(delete.getCondition()).thenReturn(Optional.of(condition)); - - // Act Assert - org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> checker.check(delete)); - verify(conditionChecker).check(condition, false); - } - - @Test - public void - checkForDelete_WithConditionThatTargetMetadataColumns_ShouldThrowIllegalArgumentException() - throws ExecutionException { - // Arrange - MutationCondition condition = - ConditionBuilder.deleteIf(ConditionBuilder.column(ANY_COL_1).isNullInt()) - .and(ConditionBuilder.column(ANY_METADATA_COL_1).isNullText()) - .build(); - when(delete.getCondition()).thenReturn(Optional.of(condition)); - - // Act Assert - Assertions.assertThatThrownBy(() -> checker.check(delete)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining(ANY_METADATA_COL_1); - verify(metadataManager).getTransactionTableMetadata(delete); - } - - @Test - public void - checkForDelete_WithConditionThatDoNotTargetMetadataColumns_ShouldCallConditionChecker() - throws ExecutionException { - // Arrange - MutationCondition condition = - ConditionBuilder.deleteIf(ConditionBuilder.column(ANY_COL_1).isNullInt()) - .and(ConditionBuilder.column(ANY_COL_2).isNullText()) - .build(); - when(delete.getCondition()).thenReturn(Optional.of(condition)); - - // Act Assert - org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> checker.check(delete)); - verify(metadataManager).getTransactionTableMetadata(delete); - verify(conditionChecker).check(condition, false); - } -} diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitOperationCheckerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitOperationCheckerTest.java new file mode 100644 index 0000000000..1ac1579816 --- /dev/null +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitOperationCheckerTest.java @@ -0,0 +1,579 @@ +package com.scalar.db.transaction.consensuscommit; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import com.scalar.db.api.ConditionBuilder; +import com.scalar.db.api.Delete; +import com.scalar.db.api.DeleteIf; +import com.scalar.db.api.DeleteIfExists; +import com.scalar.db.api.Get; +import com.scalar.db.api.MutationCondition; +import com.scalar.db.api.Put; +import com.scalar.db.api.PutIf; +import com.scalar.db.api.PutIfExists; +import com.scalar.db.api.PutIfNotExists; +import com.scalar.db.api.Scan; +import com.scalar.db.api.ScanAll; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.checker.ConditionChecker; +import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.io.DataType; +import com.scalar.db.io.Key; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Optional; +import java.util.Set; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class ConsensusCommitOperationCheckerTest { + private static final String ANY_COL_1 = "any_col_1"; + private static final String ANY_COL_2 = "any_col_2"; + private static final String ANY_METADATA_COL_1 = "any_metadata_col_1"; + private static final String ANY_METADATA_COL_2 = "any_metadata_col_2"; + @Mock private TransactionTableMetadataManager metadataManager; + @Mock private Put put; + @Mock private Delete delete; + @Mock private TransactionTableMetadata tableMetadata; + @Mock private ConditionChecker conditionChecker; + private ConsensusCommitOperationChecker checker; + + @BeforeEach + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this).close(); + checker = spy(new ConsensusCommitOperationChecker(metadataManager, false)); + when(checker.createConditionChecker(any())).thenReturn(conditionChecker); + when(metadataManager.getTransactionTableMetadata(any())).thenReturn(tableMetadata); + LinkedHashSet metadataColumns = new LinkedHashSet<>(); + metadataColumns.add(ANY_METADATA_COL_1); + metadataColumns.add(ANY_METADATA_COL_2); + when(tableMetadata.getTransactionMetaColumnNames()).thenReturn(metadataColumns); + } + + @ParameterizedTest + @ValueSource(classes = {DeleteIf.class, DeleteIfExists.class}) + public void checkForPut_WithNonAllowedCondition_ShouldThrowIllegalArgumentException( + Class deleteConditonClass) { + // Arrange + when(put.getCondition()).thenReturn(Optional.of(mock(deleteConditonClass))); + + // Act Assert + assertThatThrownBy(() -> checker.check(put)).isInstanceOf(IllegalArgumentException.class); + } + + @ParameterizedTest + @ValueSource(classes = {PutIf.class, PutIfExists.class, PutIfNotExists.class}) + public void checkForPut_WithAllowedCondition_ShouldCallConditionChecker( + Class putConditionClass) { + // Arrange + MutationCondition condition = mock(putConditionClass); + when(put.getCondition()).thenReturn(Optional.of(condition)); + + // Act Assert + assertThatCode(() -> checker.check(put)).doesNotThrowAnyException(); + verify(conditionChecker).check(condition, true); + } + + @Test + public void checkForPut_ThatMutatesMetadataColumns_ShouldThrowIllegalArgumentException() + throws ExecutionException { + // Arrange + String fullTableName = "ns.tbl"; + Set columns = ImmutableSet.of(ANY_COL_1, ANY_METADATA_COL_1, ANY_COL_2); + when(put.forFullTableName()).thenReturn(Optional.of(fullTableName)); + when(put.getContainedColumnNames()).thenReturn(columns); + + // Act Assert + assertThatThrownBy(() -> checker.check(put)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining(fullTableName) + .hasMessageContaining(ANY_METADATA_COL_1); + verify(metadataManager).getTransactionTableMetadata(put); + } + + @Test + public void checkForPut_ThatDoNotMutateMetadataColumns_ShouldDoNothing() + throws ExecutionException { + // Arrange + Set columns = ImmutableSet.of(ANY_COL_1, ANY_COL_2); + when(put.getContainedColumnNames()).thenReturn(columns); + + // Act Assert + assertThatCode(() -> checker.check(put)).doesNotThrowAnyException(); + verify(metadataManager).getTransactionTableMetadata(put); + } + + @Test + public void + checkForPut_WithConditionThatTargetMetadataColumns_ShouldThrowIllegalArgumentException() + throws ExecutionException { + // Arrange + MutationCondition condition = + ConditionBuilder.putIf(ConditionBuilder.column(ANY_COL_1).isNullInt()) + .and(ConditionBuilder.column(ANY_METADATA_COL_1).isNullText()) + .build(); + when(put.getCondition()).thenReturn(Optional.of(condition)); + when(put.forFullTableName()).thenReturn(Optional.of("ns.tbl")); + + // Act Assert + assertThatThrownBy(() -> checker.check(put)).isInstanceOf(IllegalArgumentException.class); + verify(metadataManager).getTransactionTableMetadata(put); + } + + @Test + public void checkForPut_WithConditionThatDoNotTargetMetadataColumns_ShouldCallConditionChecker() + throws ExecutionException { + // Arrange + MutationCondition condition = + ConditionBuilder.putIf(ConditionBuilder.column(ANY_COL_1).isNullInt()) + .and(ConditionBuilder.column(ANY_COL_2).isNullText()) + .build(); + when(put.getCondition()).thenReturn(Optional.of(condition)); + + // Act Assert + assertThatCode(() -> checker.check(put)).doesNotThrowAnyException(); + verify(metadataManager).getTransactionTableMetadata(put); + verify(conditionChecker).check(condition, true); + } + + @ParameterizedTest + @ValueSource(classes = {PutIf.class, PutIfExists.class, PutIfNotExists.class}) + public void checkForDelete_WithNonAllowedCondition_ShouldThrowIllegalArgumentException( + Class putConditionClass) { + // Arrange + when(delete.getCondition()).thenReturn(Optional.of(mock(putConditionClass))); + + // Act Assert + assertThatThrownBy(() -> checker.check(delete)).isInstanceOf(IllegalArgumentException.class); + } + + @ParameterizedTest + @ValueSource(classes = {DeleteIf.class, DeleteIfExists.class}) + public void checkForDelete_WithAllowedCondition_ShouldCheckCondition( + Class deleteConditionClass) { + // Arrange + MutationCondition condition = mock(deleteConditionClass); + when(delete.getCondition()).thenReturn(Optional.of(condition)); + + // Act Assert + assertThatCode(() -> checker.check(delete)).doesNotThrowAnyException(); + verify(conditionChecker).check(condition, false); + } + + @Test + public void + checkForDelete_WithConditionThatTargetMetadataColumns_ShouldThrowIllegalArgumentException() + throws ExecutionException { + // Arrange + MutationCondition condition = + ConditionBuilder.deleteIf(ConditionBuilder.column(ANY_COL_1).isNullInt()) + .and(ConditionBuilder.column(ANY_METADATA_COL_1).isNullText()) + .build(); + when(delete.getCondition()).thenReturn(Optional.of(condition)); + when(delete.forFullTableName()).thenReturn(Optional.of("ns.tbl")); + + // Act Assert + assertThatThrownBy(() -> checker.check(delete)).isInstanceOf(IllegalArgumentException.class); + verify(metadataManager).getTransactionTableMetadata(delete); + } + + @Test + public void + checkForDelete_WithConditionThatDoNotTargetMetadataColumns_ShouldCallConditionChecker() + throws ExecutionException { + // Arrange + MutationCondition condition = + ConditionBuilder.deleteIf(ConditionBuilder.column(ANY_COL_1).isNullInt()) + .and(ConditionBuilder.column(ANY_COL_2).isNullText()) + .build(); + when(delete.getCondition()).thenReturn(Optional.of(condition)); + + // Act Assert + assertThatCode(() -> checker.check(delete)).doesNotThrowAnyException(); + verify(metadataManager).getTransactionTableMetadata(delete); + verify(conditionChecker).check(condition, false); + } + + @Test + public void checkForGet_WithMetadataColumnsInProjection_ShouldThrowIllegalArgumentException() { + // Arrange + Get get = + Get.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .projections(ANY_COL_1, ANY_METADATA_COL_1) + .build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SNAPSHOT, false, false); + + // Act Assert + assertThatThrownBy(() -> checker.check(get, context)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("ns.tbl") + .hasMessageContaining(ANY_METADATA_COL_1); + } + + @Test + public void checkForGet_WithMetadataColumnsInCondition_ShouldThrowIllegalArgumentException() { + // Arrange + Get get = + Get.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .where(ConditionBuilder.column(ANY_METADATA_COL_1).isEqualToInt(10)) + .build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SNAPSHOT, false, false); + + // Act Assert + assertThatThrownBy(() -> checker.check(get, context)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void checkForGet_IncludeMetadataEnabled_ShouldNotThrowException() { + // Arrange + checker = spy(new ConsensusCommitOperationChecker(metadataManager, true)); + + Get get = + Get.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .projections(ANY_COL_1, ANY_METADATA_COL_1) + .where(ConditionBuilder.column(ANY_METADATA_COL_1).isEqualToInt(10)) + .build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SNAPSHOT, false, false); + + // Act Assert + assertThatCode(() -> checker.check(get, context)).doesNotThrowAnyException(); + } + + @Test + public void checkForGet_WithSecondaryIndexInSerializable_ShouldThrowIllegalArgumentException() { + // Arrange + TableMetadata metadata = + TableMetadata.newBuilder() + .addColumn("pk", DataType.INT) + .addColumn("idx", DataType.INT) + .addPartitionKey("pk") + .addSecondaryIndex("idx") + .build(); + when(tableMetadata.getTableMetadata()).thenReturn(metadata); + + Get get = Get.newBuilder().namespace("ns").table("tbl").indexKey(Key.ofInt("idx", 100)).build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SERIALIZABLE, false, false); + + // Act Assert + assertThatThrownBy(() -> checker.check(get, context)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void checkForGet_WithIndexKeyUsingPartitionKeyInSerializable_ShouldNotThrowException() { + // Arrange + TableMetadata metadata = + TableMetadata.newBuilder() + .addColumn("pk", DataType.INT) + .addColumn("col", DataType.INT) + .addPartitionKey("pk") + .addSecondaryIndex("pk") + .build(); + when(tableMetadata.getTableMetadata()).thenReturn(metadata); + + Get get = Get.newBuilder().namespace("ns").table("tbl").indexKey(Key.ofInt("pk", 100)).build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SERIALIZABLE, false, false); + + // Act Assert + assertThatCode(() -> checker.check(get, context)).doesNotThrowAnyException(); + } + + @Test + public void checkForGet_WithIndexKeyUsingClusteringKeyInSerializable_ShouldNotThrowException() { + // Arrange + TableMetadata metadata = + TableMetadata.newBuilder() + .addColumn("pk", DataType.INT) + .addColumn("ck", DataType.INT) + .addColumn("col", DataType.INT) + .addPartitionKey("pk") + .addClusteringKey("ck") + .addSecondaryIndex("ck") + .build(); + when(tableMetadata.getTableMetadata()).thenReturn(metadata); + + Get get = Get.newBuilder().namespace("ns").table("tbl").indexKey(Key.ofInt("ck", 100)).build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SERIALIZABLE, false, false); + + // Act Assert + assertThatCode(() -> checker.check(get, context)).doesNotThrowAnyException(); + } + + @Test + public void checkForGet_ValidGet_ShouldNotThrowException() { + // Arrange + Get get = + Get.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .projections(ANY_COL_1, ANY_COL_2) + .build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SNAPSHOT, false, false); + + // Act Assert + assertThatCode(() -> checker.check(get, context)).doesNotThrowAnyException(); + } + + @Test + public void checkForScan_WithMetadataColumnsInProjection_ShouldThrowIllegalArgumentException() { + // Arrange + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .projections(ANY_COL_1, ANY_METADATA_COL_1) + .build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SNAPSHOT, false, false); + + // Act Assert + assertThatThrownBy(() -> checker.check(scan, context)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("ns.tbl") + .hasMessageContaining(ANY_METADATA_COL_1); + } + + @Test + public void checkForScan_WithMetadataColumnsInCondition_ShouldThrowIllegalArgumentException() { + // Arrange + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .where(ConditionBuilder.column(ANY_METADATA_COL_1).isEqualToInt(10)) + .build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SNAPSHOT, false, false); + + // Act Assert + assertThatThrownBy(() -> checker.check(scan, context)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void checkForScan_WithMetadataColumnsInOrdering_ShouldThrowIllegalArgumentException() { + // Arrange + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .orderings(Scan.Ordering.asc(ANY_METADATA_COL_1)) + .build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SNAPSHOT, false, false); + + // Act Assert + assertThatThrownBy(() -> checker.check(scan, context)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("ns.tbl") + .hasMessageContaining(ANY_METADATA_COL_1); + } + + @Test + public void checkForScan_IncludeMetadataEnabled_ShouldNotThrowException() { + // Arrange + checker = spy(new ConsensusCommitOperationChecker(metadataManager, true)); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .projections(ANY_COL_1, ANY_METADATA_COL_1) + .where(ConditionBuilder.column(ANY_METADATA_COL_1).isEqualToInt(10)) + .orderings(Scan.Ordering.asc(ANY_METADATA_COL_1)) + .build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SNAPSHOT, false, false); + + // Act Assert + assertThatCode(() -> checker.check(scan, context)).doesNotThrowAnyException(); + } + + @Test + public void checkForScan_WithSecondaryIndexInSerializable_ShouldThrowIllegalArgumentException() { + // Arrange + TableMetadata metadata = + TableMetadata.newBuilder() + .addColumn("pk", DataType.INT) + .addColumn("idx", DataType.INT) + .addPartitionKey("pk") + .addSecondaryIndex("idx") + .build(); + when(tableMetadata.getTableMetadata()).thenReturn(metadata); + + Scan scan = + Scan.newBuilder().namespace("ns").table("tbl").indexKey(Key.ofInt("idx", 100)).build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SERIALIZABLE, false, false); + + // Act Assert + assertThatThrownBy(() -> checker.check(scan, context)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void checkForScan_WithIndexKeyUsingPartitionKeyInSerializable_ShouldNotThrowException() { + // Arrange + TableMetadata metadata = + TableMetadata.newBuilder() + .addColumn("pk", DataType.INT) + .addColumn("col", DataType.INT) + .addPartitionKey("pk") + .addSecondaryIndex("pk") + .build(); + when(tableMetadata.getTableMetadata()).thenReturn(metadata); + + Scan scan = + Scan.newBuilder().namespace("ns").table("tbl").indexKey(Key.ofInt("pk", 100)).build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SERIALIZABLE, false, false); + + // Act Assert + assertThatCode(() -> checker.check(scan, context)).doesNotThrowAnyException(); + } + + @Test + public void checkForScan_WithIndexKeyUsingClusteringKeyInSerializable_ShouldNotThrowException() { + // Arrange + TableMetadata metadata = + TableMetadata.newBuilder() + .addColumn("pk", DataType.INT) + .addColumn("ck", DataType.INT) + .addColumn("col", DataType.INT) + .addPartitionKey("pk") + .addClusteringKey("ck") + .addSecondaryIndex("ck") + .build(); + when(tableMetadata.getTableMetadata()).thenReturn(metadata); + + Scan scan = + Scan.newBuilder().namespace("ns").table("tbl").indexKey(Key.ofInt("ck", 100)).build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SERIALIZABLE, false, false); + + // Act Assert + assertThatCode(() -> checker.check(scan, context)).doesNotThrowAnyException(); + } + + @Test + public void + checkForScan_ScanAllWithConditionOnIndexedColumnInSerializable_ShouldThrowIllegalArgumentException() { + // Arrange + Set secondaryIndexNames = new LinkedHashSet<>(Collections.singletonList("idx_col")); + when(tableMetadata.getSecondaryIndexNames()).thenReturn(secondaryIndexNames); + + Scan scan = + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(ConditionBuilder.column("idx_col").isEqualToInt(100)) + .build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SERIALIZABLE, false, false); + + // Act Assert + assertThatThrownBy(() -> checker.check(scan, context)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void + checkForScan_ScanAllWithConditionOnNonIndexedColumnInSerializable_ShouldNotThrowException() { + // Arrange + Set secondaryIndexNames = new LinkedHashSet<>(Collections.singletonList("idx_col")); + when(tableMetadata.getSecondaryIndexNames()).thenReturn(secondaryIndexNames); + + Scan scan = + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(ConditionBuilder.column("non_idx_col").isEqualToInt(100)) + .build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SERIALIZABLE, false, false); + + // Act Assert + assertThatCode(() -> checker.check(scan, context)).doesNotThrowAnyException(); + } + + @Test + public void + checkForScan_RegularScanWithConditionOnIndexedColumnInSerializable_ShouldNotThrowException() { + // Arrange + TableMetadata metadata = + TableMetadata.newBuilder() + .addColumn("pk", DataType.INT) + .addColumn("idx_col", DataType.INT) + .addPartitionKey("pk") + .addSecondaryIndex("idx_col") + .build(); + Set secondaryIndexNames = new LinkedHashSet<>(Collections.singletonList("idx_col")); + when(tableMetadata.getTableMetadata()).thenReturn(metadata); + when(tableMetadata.getSecondaryIndexNames()).thenReturn(secondaryIndexNames); + + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .where(ConditionBuilder.column("idx_col").isEqualToInt(100)) + .build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SERIALIZABLE, false, false); + + // Act Assert + assertThatCode(() -> checker.check(scan, context)).doesNotThrowAnyException(); + } + + @Test + public void checkForScan_ValidScan_ShouldNotThrowException() { + // Arrange + Scan scan = + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .projections(ANY_COL_1, ANY_COL_2) + .build(); + TransactionContext context = + new TransactionContext("txId", null, Isolation.SNAPSHOT, false, false); + + // Act Assert + assertThatCode(() -> checker.check(scan, context)).doesNotThrowAnyException(); + } +} diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java index 3370d54429..fb7b50162e 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitTest.java @@ -58,7 +58,7 @@ public class ConsensusCommitTest { @Mock private Snapshot snapshot; @Mock private CrudHandler crud; @Mock private CommitHandler commit; - @Mock private ConsensusCommitMutationOperationChecker mutationOperationChecker; + @Mock private ConsensusCommitOperationChecker operationChecker; private ConsensusCommit consensus; @@ -68,7 +68,7 @@ public void setUp() throws Exception { // Arrange context = spy(new TransactionContext(ANY_ID, snapshot, Isolation.SNAPSHOT, false, false)); - consensus = new ConsensusCommit(context, crud, commit, mutationOperationChecker, null); + consensus = new ConsensusCommit(context, crud, commit, operationChecker, null); } private Get prepareGet() { @@ -115,9 +115,10 @@ private Delete prepareDelete() { } @Test - public void get_GetGiven_ShouldCallCrudHandlerGet() throws CrudException { + public void get_GetGiven_ShouldCallCrudHandlerGet() throws CrudException, ExecutionException { // Arrange Get get = prepareGet(); + doNothing().when(operationChecker).check(get, context); TransactionResult result = mock(TransactionResult.class); when(crud.get(get, context)).thenReturn(Optional.of(result)); @@ -126,13 +127,32 @@ public void get_GetGiven_ShouldCallCrudHandlerGet() throws CrudException { // Assert assertThat(actual).isPresent(); + verify(operationChecker).check(get, context); verify(crud).get(get, context); } @Test - public void scan_ScanGiven_ShouldCallCrudHandlerScan() throws CrudException { + public void get_OperationCheckerThrowsExecutionException_ShouldThrowCrudException() + throws ExecutionException, CrudException { + // Arrange + Get get = prepareGet(); + ExecutionException exception = new ExecutionException("operation check failed"); + doThrow(exception).when(operationChecker).check(get, context); + + // Act Assert + assertThatThrownBy(() -> consensus.get(get)) + .isInstanceOf(CrudException.class) + .hasMessage("operation check failed. Transaction ID: " + ANY_ID) + .hasCause(exception); + verify(operationChecker).check(get, context); + verify(crud, never()).get(any(), any()); + } + + @Test + public void scan_ScanGiven_ShouldCallCrudHandlerScan() throws CrudException, ExecutionException { // Arrange Scan scan = prepareScan(); + doNothing().when(operationChecker).check(scan, context); TransactionResult result = mock(TransactionResult.class); List results = Collections.singletonList(result); when(crud.scan(scan, context)).thenReturn(results); @@ -142,14 +162,33 @@ public void scan_ScanGiven_ShouldCallCrudHandlerScan() throws CrudException { // Assert assertThat(actual.size()).isEqualTo(1); + verify(operationChecker).check(scan, context); verify(crud).scan(scan, context); } + @Test + public void scan_OperationCheckerThrowsExecutionException_ShouldThrowCrudException() + throws ExecutionException, CrudException { + // Arrange + Scan scan = prepareScan(); + ExecutionException exception = new ExecutionException("operation check failed"); + doThrow(exception).when(operationChecker).check(scan, context); + + // Act Assert + assertThatThrownBy(() -> consensus.scan(scan)) + .isInstanceOf(CrudException.class) + .hasMessage("operation check failed. Transaction ID: " + ANY_ID) + .hasCause(exception); + verify(operationChecker).check(scan, context); + verify(crud, never()).scan(any(), any()); + } + @Test public void getScannerAndScannerOne_ShouldCallCrudHandlerGetScannerAndScannerOne() - throws CrudException { + throws CrudException, ExecutionException { // Arrange Scan scan = prepareScan(); + doNothing().when(operationChecker).check(scan, context); TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); Result result = mock(Result.class); when(scanner.one()).thenReturn(Optional.of(result)); @@ -161,15 +200,17 @@ public void getScannerAndScannerOne_ShouldCallCrudHandlerGetScannerAndScannerOne // Assert assertThat(actualResult).hasValue(result); + verify(operationChecker).check(scan, context); verify(crud).getScanner(scan, context); verify(scanner).one(); } @Test public void getScannerAndScannerAll_ShouldCallCrudHandlerGetScannerAndScannerAll() - throws CrudException { + throws CrudException, ExecutionException { // Arrange Scan scan = prepareScan(); + doNothing().when(operationChecker).check(scan, context); TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); Result result1 = mock(Result.class); Result result2 = mock(Result.class); @@ -182,10 +223,28 @@ public void getScannerAndScannerAll_ShouldCallCrudHandlerGetScannerAndScannerAll // Assert assertThat(actualResults).containsExactly(result1, result2); + verify(operationChecker).check(scan, context); verify(crud).getScanner(scan, context); verify(scanner).all(); } + @Test + public void getScanner_OperationCheckerThrowsExecutionException_ShouldThrowCrudException() + throws ExecutionException, CrudException { + // Arrange + Scan scan = prepareScan(); + ExecutionException exception = new ExecutionException("operation check failed"); + doThrow(exception).when(operationChecker).check(scan, context); + + // Act Assert + assertThatThrownBy(() -> consensus.getScanner(scan)) + .isInstanceOf(CrudException.class) + .hasMessage("operation check failed. Transaction ID: " + ANY_ID) + .hasCause(exception); + verify(operationChecker).check(scan, context); + verify(crud, never()).getScanner(any(), any()); + } + @Test public void put_PutGiven_ShouldCallCrudHandlerPut() throws ExecutionException, CrudException { // Arrange @@ -197,7 +256,7 @@ public void put_PutGiven_ShouldCallCrudHandlerPut() throws ExecutionException, C // Assert verify(crud).put(put, context); - verify(mutationOperationChecker).check(put); + verify(operationChecker).check(put); } @Test @@ -212,7 +271,7 @@ public void put_TwoPutsGiven_ShouldCallCrudHandlerPutTwice() // Assert verify(crud, times(2)).put(put, context); - verify(mutationOperationChecker, times(2)).check(put); + verify(operationChecker, times(2)).check(put); } @Test @@ -227,7 +286,7 @@ public void delete_DeleteGiven_ShouldCallCrudHandlerDelete() // Assert verify(crud).delete(delete, context); - verify(mutationOperationChecker).check(delete); + verify(operationChecker).check(delete); } @Test @@ -242,7 +301,7 @@ public void delete_TwoDeletesGiven_ShouldCallCrudHandlerDeleteTwice() // Assert verify(crud, times(2)).delete(delete, context); - verify(mutationOperationChecker, times(2)).check(delete); + verify(operationChecker, times(2)).check(delete); } @Test @@ -272,7 +331,7 @@ public void insert_InsertGiven_ShouldCallCrudHandlerPut() .enableInsertMode() .build(); verify(crud).put(expectedPut, context); - verify(mutationOperationChecker).check(expectedPut); + verify(operationChecker).check(expectedPut); } @Test @@ -302,7 +361,7 @@ public void upsert_UpsertGiven_ShouldCallCrudHandlerPut() .enableImplicitPreRead() .build(); verify(crud).put(expectedPut, context); - verify(mutationOperationChecker).check(expectedPut); + verify(operationChecker).check(expectedPut); } @Test @@ -333,7 +392,7 @@ public void update_UpdateWithoutConditionGiven_ShouldCallCrudHandlerPut() .enableImplicitPreRead() .build(); verify(crud).put(expectedPut, context); - verify(mutationOperationChecker).check(expectedPut); + verify(operationChecker).check(expectedPut); } @Test @@ -371,7 +430,7 @@ public void update_UpdateWithConditionGiven_ShouldCallCrudHandlerPut() .enableImplicitPreRead() .build(); verify(crud).put(expectedPut, context); - verify(mutationOperationChecker).check(expectedPut); + verify(operationChecker).check(expectedPut); } @Test @@ -558,11 +617,11 @@ public void mutate_MutationsGiven_ShouldCallCrudHandlerPutAndDelete() verify(crud).put(expectedPutFromUpsert, context); verify(crud).put(expectedPutFromUpdate, context); verify(crud).delete(delete, context); - verify(mutationOperationChecker).check(put); - verify(mutationOperationChecker).check(expectedPutFromInsert); - verify(mutationOperationChecker).check(expectedPutFromUpsert); - verify(mutationOperationChecker).check(expectedPutFromUpdate); - verify(mutationOperationChecker).check(delete); + verify(operationChecker).check(put); + verify(operationChecker).check(expectedPutFromInsert); + verify(operationChecker).check(expectedPutFromUpsert); + verify(operationChecker).check(expectedPutFromUpdate); + verify(operationChecker).check(delete); } @Test @@ -660,11 +719,11 @@ public void batch_OperationsGiven_ShouldCallCrudHandlerProperly() verify(crud).put(expectedPutFromUpsert, context); verify(crud).put(expectedPutFromUpdate, context); verify(crud).delete(delete, context); - verify(mutationOperationChecker).check(put); - verify(mutationOperationChecker).check(expectedPutFromInsert); - verify(mutationOperationChecker).check(expectedPutFromUpsert); - verify(mutationOperationChecker).check(expectedPutFromUpdate); - verify(mutationOperationChecker).check(delete); + verify(operationChecker).check(put); + verify(operationChecker).check(expectedPutFromInsert); + verify(operationChecker).check(expectedPutFromUpsert); + verify(operationChecker).check(expectedPutFromUpdate); + verify(operationChecker).check(delete); assertThat(results).hasSize(7); assertThat(results.get(0).getType()).isEqualTo(CrudOperable.BatchResult.Type.GET); assertThat(results.get(0).getGetResult()).hasValue(result1); @@ -708,7 +767,7 @@ public void commit_ProcessedCrudGiven_InReadOnlyMode_ShouldCommitWithSnapshot() // Arrange doNothing().when(commit).commit(any(TransactionContext.class)); context = spy(new TransactionContext(ANY_ID, snapshot, Isolation.SNAPSHOT, true, false)); - consensus = new ConsensusCommit(context, crud, commit, mutationOperationChecker, null); + consensus = new ConsensusCommit(context, crud, commit, operationChecker, null); // Act consensus.commit(); @@ -796,7 +855,7 @@ public void rollback_WithGroupCommitter_ShouldRemoveTxFromGroupCommitter() // Arrange CoordinatorGroupCommitter groupCommitter = mock(CoordinatorGroupCommitter.class); ConsensusCommit consensusWithGroupCommit = - new ConsensusCommit(context, crud, commit, mutationOperationChecker, groupCommitter); + new ConsensusCommit(context, crud, commit, operationChecker, groupCommitter); // Act consensusWithGroupCommit.rollback(); @@ -815,7 +874,7 @@ public void rollback_WithGroupCommitter_InReadOnlyMode_ShouldNotRemoveTxFromGrou context = spy(new TransactionContext(ANY_ID, snapshot, Isolation.SNAPSHOT, true, false)); CoordinatorGroupCommitter groupCommitter = mock(CoordinatorGroupCommitter.class); ConsensusCommit consensusWithGroupCommit = - new ConsensusCommit(context, crud, commit, mutationOperationChecker, groupCommitter); + new ConsensusCommit(context, crud, commit, operationChecker, groupCommitter); // Act consensusWithGroupCommit.rollback(); diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitUtilsTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitUtilsTest.java index a17a2f21d9..fc63a28e07 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitUtilsTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitUtilsTest.java @@ -10,6 +10,7 @@ import com.scalar.db.api.Consistency; import com.scalar.db.api.Get; import com.scalar.db.api.Scan; +import com.scalar.db.api.ScanAll; import com.scalar.db.api.TableMetadata; import com.scalar.db.io.Column; import com.scalar.db.io.DataType; @@ -921,6 +922,83 @@ public void prepareGetForStorage_GetWithConjunctions_shouldConvertConjunctions() .or(column(Attribute.BEFORE_PREFIX + "col4").isLikeText("%pattern%", "\\")) .consistency(Consistency.LINEARIZABLE) .build()); + + // Condition on secondary index (should be converted for Get) + assertThat( + ConsensusCommitUtils.prepareGetForStorage( + Get.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 100)) + .clusteringKey(Key.ofInt("ck", 200)) + .where(column("col3").isEqualToInt(300)) + .build(), + metadata)) + .isEqualTo( + Get.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 100)) + .clusteringKey(Key.ofInt("ck", 200)) + .where(column("col3").isEqualToInt(300)) + .or(column(Attribute.BEFORE_PREFIX + "col3").isEqualToInt(300)) + .consistency(Consistency.LINEARIZABLE) + .build()); + + // Mixed condition: secondary index and non-indexed column (both should be converted for Get) + assertThat( + ConsensusCommitUtils.prepareGetForStorage( + Get.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 100)) + .clusteringKey(Key.ofInt("ck", 200)) + .where(column("col3").isEqualToInt(300)) + .and(column("col1").isEqualToInt(10)) + .build(), + metadata)) + .isEqualTo( + Get.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 100)) + .clusteringKey(Key.ofInt("ck", 200)) + .where( + condition(column("col3").isEqualToInt(300)) + .and(column("col1").isEqualToInt(10)) + .build()) + .or( + condition(column(Attribute.BEFORE_PREFIX + "col3").isEqualToInt(300)) + .and(column(Attribute.BEFORE_PREFIX + "col1").isEqualToInt(10)) + .build()) + .consistency(Consistency.LINEARIZABLE) + .build()); + + // Mixed condition with OR: secondary index OR non-indexed column (both should be converted + // for Get) + assertThat( + ConsensusCommitUtils.prepareGetForStorage( + Get.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 100)) + .clusteringKey(Key.ofInt("ck", 200)) + .where(column("col3").isEqualToInt(300)) + .or(column("col1").isEqualToInt(10)) + .build(), + metadata)) + .isEqualTo( + Get.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 100)) + .clusteringKey(Key.ofInt("ck", 200)) + .where(column("col3").isEqualToInt(300)) + .or(column(Attribute.BEFORE_PREFIX + "col3").isEqualToInt(300)) + .or(column("col1").isEqualToInt(10)) + .or(column(Attribute.BEFORE_PREFIX + "col1").isEqualToInt(10)) + .consistency(Consistency.LINEARIZABLE) + .build()); } @Test @@ -1287,5 +1365,172 @@ public void prepareScanForStorage_ScanWithConjunctions_shouldConvertConjunctions .or(column(Attribute.BEFORE_PREFIX + "col1").isEqualToInt(10)) .consistency(Consistency.LINEARIZABLE) .build()); + + // Condition on secondary index (should be converted) + assertThat( + ConsensusCommitUtils.prepareScanForStorage( + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 100)) + .where(column("col3").isEqualToInt(300)) + .build(), + metadata)) + .isEqualTo( + Scan.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 100)) + .where(column("col3").isEqualToInt(300)) + .or(column(Attribute.BEFORE_PREFIX + "col3").isEqualToInt(300)) + .consistency(Consistency.LINEARIZABLE) + .build()); + } + + @Test + public void + prepareScanForStorage_ScanAllWithConjunctionsOnSecondaryIndex_shouldNotConvertSecondaryIndexConditions() { + // Arrange + TableMetadata metadata = + ConsensusCommitUtils.buildTransactionTableMetadata( + TableMetadata.newBuilder() + .addColumn("pk", DataType.INT) + .addColumn("ck", DataType.INT) + .addColumn("col1", DataType.INT) + .addColumn("col2", DataType.INT) + .addColumn("col3", DataType.INT) + .addColumn("col4", DataType.TEXT) + .addPartitionKey("pk") + .addClusteringKey("ck") + .addSecondaryIndex("col3") + .build()); + + // Act Assert + + // Condition on secondary index (should NOT be converted for ScanAll) + assertThat( + ConsensusCommitUtils.prepareScanForStorage( + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(column("col3").isEqualToInt(300)) + .build(), + metadata)) + .isEqualTo( + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(column("col3").isEqualToInt(300)) + .consistency(Consistency.LINEARIZABLE) + .build()); + + // Condition on non-indexed column (should be converted for ScanAll) + assertThat( + ConsensusCommitUtils.prepareScanForStorage( + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(column("col1").isEqualToInt(10)) + .build(), + metadata)) + .isEqualTo( + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(column("col1").isEqualToInt(10)) + .or(column(Attribute.BEFORE_PREFIX + "col1").isEqualToInt(10)) + .consistency(Consistency.LINEARIZABLE) + .build()); + + // Mixed condition: secondary index and non-indexed column + assertThat( + ConsensusCommitUtils.prepareScanForStorage( + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(column("col3").isEqualToInt(300)) + .and(column("col1").isEqualToInt(10)) + .build(), + metadata)) + .isEqualTo( + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where( + condition(column("col3").isEqualToInt(300)) + .and(column("col1").isEqualToInt(10)) + .build()) + .or( + condition(column("col3").isEqualToInt(300)) + .and(column(Attribute.BEFORE_PREFIX + "col1").isEqualToInt(10)) + .build()) + .consistency(Consistency.LINEARIZABLE) + .build()); + + // Mixed condition with OR: secondary index OR non-indexed column + assertThat( + ConsensusCommitUtils.prepareScanForStorage( + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(column("col3").isEqualToInt(300)) + .or(column("col1").isEqualToInt(10)) + .build(), + metadata)) + .isEqualTo( + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(column("col3").isEqualToInt(300)) + .or(column("col1").isEqualToInt(10)) + .or(column(Attribute.BEFORE_PREFIX + "col1").isEqualToInt(10)) + .consistency(Consistency.LINEARIZABLE) + .build()); + + // Condition on partition key should not be converted + assertThat( + ConsensusCommitUtils.prepareScanForStorage( + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(column("pk").isEqualToInt(100)) + .build(), + metadata)) + .isEqualTo( + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(column("pk").isEqualToInt(100)) + .consistency(Consistency.LINEARIZABLE) + .build()); + + // Condition on clustering key should not be converted + assertThat( + ConsensusCommitUtils.prepareScanForStorage( + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(column("ck").isGreaterThanInt(150)) + .build(), + metadata)) + .isEqualTo( + ScanAll.newBuilder() + .namespace("ns") + .table("tbl") + .all() + .where(column("ck").isGreaterThanInt(150)) + .consistency(Consistency.LINEARIZABLE) + .build()); } } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java index 4cb4a6a5e9..6209315145 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java @@ -3,7 +3,6 @@ import static com.scalar.db.api.ConditionBuilder.column; import static com.scalar.db.api.ConditionBuilder.deleteIfExists; import static com.scalar.db.api.ConditionBuilder.putIfExists; -import static com.scalar.db.api.ConditionSetBuilder.condition; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -1903,7 +1902,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C // Assert verify(spied, never()).readUnread(any(), any(), any(), any()); verify(snapshot).getResult(key); - verify(mutationConditionsValidator).checkIfConditionIsSatisfied(put, result, context); + verify(mutationConditionsValidator).checkIfConditionIsSatisfied(put, result, ANY_ID_1); verify(snapshot).putIntoWriteSet(key, put); } @@ -1947,7 +1946,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C // Assert verify(spied).read(key, getForKey, context, TABLE_METADATA); verify(snapshot).getResult(key); - verify(mutationConditionsValidator).checkIfConditionIsSatisfied(put, result, context); + verify(mutationConditionsValidator).checkIfConditionIsSatisfied(put, result, ANY_ID_1); verify(snapshot).putIntoWriteSet(key, put); } @@ -1980,7 +1979,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C // Assert verify(spied, never()).readUnread(any(), any(), any(), any()); verify(snapshot).getResult(key); - verify(mutationConditionsValidator).checkIfConditionIsSatisfied(put, result, context); + verify(mutationConditionsValidator).checkIfConditionIsSatisfied(put, result, ANY_ID_1); verify(snapshot).putIntoWriteSet(key, put); } @@ -2058,7 +2057,7 @@ public void delete_DeleteWithConditionGiven_WithResultInReadSet_ShouldCallApprop // Assert verify(spied, never()).readUnread(any(), any(), any(), any()); verify(snapshot).getResult(key); - verify(mutationConditionsValidator).checkIfConditionIsSatisfied(delete, result, context); + verify(mutationConditionsValidator).checkIfConditionIsSatisfied(delete, result, ANY_ID_1); verify(snapshot).putIntoDeleteSet(key, delete); } @@ -2098,7 +2097,7 @@ public void delete_DeleteWithConditionGiven_WithoutResultInReadSet_ShouldCallApp // Assert verify(spied).read(key, getForKey, context, TABLE_METADATA); verify(snapshot).getResult(key); - verify(mutationConditionsValidator).checkIfConditionIsSatisfied(delete, null, context); + verify(mutationConditionsValidator).checkIfConditionIsSatisfied(delete, null, ANY_ID_1); verify(snapshot).putIntoDeleteSet(key, delete); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/MutationConditionsValidatorTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/MutationConditionsValidatorTest.java index 673e84f5c2..655cd62b26 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/MutationConditionsValidatorTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/MutationConditionsValidatorTest.java @@ -32,6 +32,9 @@ import org.mockito.MockitoAnnotations; public class MutationConditionsValidatorTest { + + private static final String TRANSACTION_ID = "id"; + private MutationConditionsValidator validator; @Mock private TransactionResult existingRecord; @Mock private Put put; @@ -45,14 +48,10 @@ public class MutationConditionsValidatorTest { private static final String C2 = "col_2"; private static final String C3 = "col_3"; - private TransactionContext context; - @Mock private Snapshot snapshot; - @BeforeEach public void setUp() throws Exception { MockitoAnnotations.openMocks(this).close(); validator = new MutationConditionsValidator(); - context = new TransactionContext("a_tx_id", snapshot, Isolation.SNAPSHOT, false, false); } @Test @@ -62,7 +61,8 @@ public void setUp() throws Exception { prepareMutationWithCondition(put, putIf); // Act Assert - Assertions.assertThatThrownBy(() -> validator.checkIfConditionIsSatisfied(put, null, context)) + Assertions.assertThatThrownBy( + () -> validator.checkIfConditionIsSatisfied(put, null, TRANSACTION_ID)) .isInstanceOf(UnsatisfiedConditionException.class); } @@ -73,7 +73,8 @@ public void setUp() throws Exception { prepareMutationWithCondition(put, putIfExists); // Act Assert - Assertions.assertThatThrownBy(() -> validator.checkIfConditionIsSatisfied(put, null, context)) + Assertions.assertThatThrownBy( + () -> validator.checkIfConditionIsSatisfied(put, null, TRANSACTION_ID)) .isInstanceOf(UnsatisfiedConditionException.class); } @@ -85,7 +86,7 @@ public void setUp() throws Exception { // Act Assert Assertions.assertThatCode( - () -> validator.checkIfConditionIsSatisfied(put, existingRecord, context)) + () -> validator.checkIfConditionIsSatisfied(put, existingRecord, TRANSACTION_ID)) .doesNotThrowAnyException(); } @@ -97,7 +98,7 @@ public void setUp() throws Exception { // Act Assert Assertions.assertThatThrownBy( - () -> validator.checkIfConditionIsSatisfied(put, existingRecord, context)) + () -> validator.checkIfConditionIsSatisfied(put, existingRecord, TRANSACTION_ID)) .isInstanceOf(UnsatisfiedConditionException.class); } @@ -108,7 +109,8 @@ public void setUp() throws Exception { prepareMutationWithCondition(put, putIfNotExists); // Act Assert - Assertions.assertThatCode(() -> validator.checkIfConditionIsSatisfied(put, null, context)) + Assertions.assertThatCode( + () -> validator.checkIfConditionIsSatisfied(put, null, TRANSACTION_ID)) .doesNotThrowAnyException(); } @@ -120,7 +122,7 @@ public void setUp() throws Exception { // Act Assert Assertions.assertThatThrownBy( - () -> validator.checkIfConditionIsSatisfied(delete, null, context)) + () -> validator.checkIfConditionIsSatisfied(delete, null, TRANSACTION_ID)) .isInstanceOf(UnsatisfiedConditionException.class); } @@ -132,7 +134,7 @@ public void setUp() throws Exception { // Act Assert Assertions.assertThatThrownBy( - () -> validator.checkIfConditionIsSatisfied(delete, null, context)) + () -> validator.checkIfConditionIsSatisfied(delete, null, TRANSACTION_ID)) .isInstanceOf(UnsatisfiedConditionException.class); } @@ -144,7 +146,7 @@ public void setUp() throws Exception { // Act Assert Assertions.assertThatCode( - () -> validator.checkIfConditionIsSatisfied(delete, existingRecord, context)) + () -> validator.checkIfConditionIsSatisfied(delete, existingRecord, TRANSACTION_ID)) .doesNotThrowAnyException(); } @@ -319,9 +321,9 @@ private List prepareMutationOperations( private void validateConditionIsSatisfied(Mutation mutation, TransactionResult existingRecord) throws UnsatisfiedConditionException { if (mutation instanceof Put) { - validator.checkIfConditionIsSatisfied((Put) mutation, existingRecord, context); + validator.checkIfConditionIsSatisfied((Put) mutation, existingRecord, TRANSACTION_ID); } else { - validator.checkIfConditionIsSatisfied((Delete) mutation, existingRecord, context); + validator.checkIfConditionIsSatisfied((Delete) mutation, existingRecord, TRANSACTION_ID); } } } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java index 369c62812a..05f5ab2fa7 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java @@ -3,6 +3,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -62,7 +64,7 @@ public class TwoPhaseConsensusCommitTest { @Mock private Snapshot snapshot; @Mock private CrudHandler crud; @Mock private CommitHandler commit; - @Mock private ConsensusCommitMutationOperationChecker mutationOperationChecker; + @Mock private ConsensusCommitOperationChecker operationChecker; private TwoPhaseConsensusCommit transaction; @@ -72,7 +74,7 @@ public void setUp() throws Exception { // Arrange context = spy(new TransactionContext(ANY_TX_ID, snapshot, Isolation.SNAPSHOT, false, false)); - transaction = new TwoPhaseConsensusCommit(context, crud, commit, mutationOperationChecker); + transaction = new TwoPhaseConsensusCommit(context, crud, commit, operationChecker); } private Get prepareGet() { @@ -119,9 +121,10 @@ private Delete prepareDelete() { } @Test - public void get_GetGiven_ShouldCallCrudHandlerGet() throws CrudException { + public void get_GetGiven_ShouldCallCrudHandlerGet() throws CrudException, ExecutionException { // Arrange Get get = prepareGet(); + doNothing().when(operationChecker).check(get, context); TransactionResult result = mock(TransactionResult.class); when(crud.get(get, context)).thenReturn(Optional.of(result)); @@ -130,13 +133,32 @@ public void get_GetGiven_ShouldCallCrudHandlerGet() throws CrudException { // Assert assertThat(actual).isPresent(); + verify(operationChecker).check(get, context); verify(crud).get(get, context); } @Test - public void scan_ScanGiven_ShouldCallCrudHandlerScan() throws CrudException { + public void get_OperationCheckerThrowsExecutionException_ShouldThrowCrudException() + throws ExecutionException, CrudException { + // Arrange + Get get = prepareGet(); + ExecutionException exception = new ExecutionException("operation check failed"); + doThrow(exception).when(operationChecker).check(get, context); + + // Act Assert + assertThatThrownBy(() -> transaction.get(get)) + .isInstanceOf(CrudException.class) + .hasMessage("operation check failed. Transaction ID: " + ANY_TX_ID) + .hasCause(exception); + verify(operationChecker).check(get, context); + verify(crud, never()).get(any(), any()); + } + + @Test + public void scan_ScanGiven_ShouldCallCrudHandlerScan() throws CrudException, ExecutionException { // Arrange Scan scan = prepareScan(); + doNothing().when(operationChecker).check(scan, context); TransactionResult result = mock(TransactionResult.class); List results = Collections.singletonList(result); when(crud.scan(scan, context)).thenReturn(results); @@ -146,14 +168,33 @@ public void scan_ScanGiven_ShouldCallCrudHandlerScan() throws CrudException { // Assert assertThat(actual.size()).isEqualTo(1); + verify(operationChecker).check(scan, context); verify(crud).scan(scan, context); } + @Test + public void scan_OperationCheckerThrowsExecutionException_ShouldThrowCrudException() + throws ExecutionException, CrudException { + // Arrange + Scan scan = prepareScan(); + ExecutionException exception = new ExecutionException("operation check failed"); + doThrow(exception).when(operationChecker).check(scan, context); + + // Act Assert + assertThatThrownBy(() -> transaction.scan(scan)) + .isInstanceOf(CrudException.class) + .hasMessage("operation check failed. Transaction ID: " + ANY_TX_ID) + .hasCause(exception); + verify(operationChecker).check(scan, context); + verify(crud, never()).scan(any(), any()); + } + @Test public void getScannerAndScannerOne_ShouldCallCrudHandlerGetScannerAndScannerOne() - throws CrudException { + throws CrudException, ExecutionException { // Arrange Scan scan = prepareScan(); + doNothing().when(operationChecker).check(scan, context); TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); Result result = mock(Result.class); when(scanner.one()).thenReturn(Optional.of(result)); @@ -165,15 +206,17 @@ public void getScannerAndScannerOne_ShouldCallCrudHandlerGetScannerAndScannerOne // Assert assertThat(actualResult).hasValue(result); + verify(operationChecker).check(scan, context); verify(crud).getScanner(scan, context); verify(scanner).one(); } @Test public void getScannerAndScannerAll_ShouldCallCrudHandlerGetScannerAndScannerAll() - throws CrudException { + throws CrudException, ExecutionException { // Arrange Scan scan = prepareScan(); + doNothing().when(operationChecker).check(scan, context); TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); Result result1 = mock(Result.class); Result result2 = mock(Result.class); @@ -186,10 +229,28 @@ public void getScannerAndScannerAll_ShouldCallCrudHandlerGetScannerAndScannerAll // Assert assertThat(actualResults).containsExactly(result1, result2); + verify(operationChecker).check(scan, context); verify(crud).getScanner(scan, context); verify(scanner).all(); } + @Test + public void getScanner_OperationCheckerThrowsExecutionException_ShouldThrowCrudException() + throws ExecutionException, CrudException { + // Arrange + Scan scan = prepareScan(); + ExecutionException exception = new ExecutionException("operation check failed"); + doThrow(exception).when(operationChecker).check(scan, context); + + // Act Assert + assertThatThrownBy(() -> transaction.getScanner(scan)) + .isInstanceOf(CrudException.class) + .hasMessage("operation check failed. Transaction ID: " + ANY_TX_ID) + .hasCause(exception); + verify(operationChecker).check(scan, context); + verify(crud, never()).getScanner(any(), any()); + } + @Test public void put_PutGiven_ShouldCallCrudHandlerPut() throws ExecutionException, CrudException { // Arrange @@ -200,7 +261,7 @@ public void put_PutGiven_ShouldCallCrudHandlerPut() throws ExecutionException, C // Assert verify(crud).put(put, context); - verify(mutationOperationChecker).check(put); + verify(operationChecker).check(put); } @Test @@ -214,7 +275,7 @@ public void put_TwoPutsGiven_ShouldCallCrudHandlerPutTwice() // Assert verify(crud, times(2)).put(put, context); - verify(mutationOperationChecker, times(2)).check(put); + verify(operationChecker, times(2)).check(put); } @Test @@ -228,7 +289,7 @@ public void delete_DeleteGiven_ShouldCallCrudHandlerDelete() // Assert verify(crud).delete(delete, context); - verify(mutationOperationChecker).check(delete); + verify(operationChecker).check(delete); } @Test @@ -242,7 +303,7 @@ public void delete_TwoDeletesGiven_ShouldCallCrudHandlerDeleteTwice() // Assert verify(crud, times(2)).delete(delete, context); - verify(mutationOperationChecker, times(2)).check(delete); + verify(operationChecker, times(2)).check(delete); } @Test @@ -272,7 +333,7 @@ public void insert_InsertGiven_ShouldCallCrudHandlerPut() .enableInsertMode() .build(); verify(crud).put(expectedPut, context); - verify(mutationOperationChecker).check(expectedPut); + verify(operationChecker).check(expectedPut); } @Test @@ -302,7 +363,7 @@ public void upsert_UpsertGiven_ShouldCallCrudHandlerPut() .enableImplicitPreRead() .build(); verify(crud).put(expectedPut, context); - verify(mutationOperationChecker).check(expectedPut); + verify(operationChecker).check(expectedPut); } @Test @@ -333,7 +394,7 @@ public void update_UpdateWithoutConditionGiven_ShouldCallCrudHandlerPut() .enableImplicitPreRead() .build(); verify(crud).put(expectedPut, context); - verify(mutationOperationChecker).check(expectedPut); + verify(operationChecker).check(expectedPut); } @Test @@ -371,7 +432,7 @@ public void update_UpdateWithConditionGiven_ShouldCallCrudHandlerPut() .enableImplicitPreRead() .build(); verify(crud).put(expectedPut, context); - verify(mutationOperationChecker).check(expectedPut); + verify(operationChecker).check(expectedPut); } @Test @@ -558,11 +619,11 @@ public void mutate_MutationsGiven_ShouldCallCrudHandlerPutAndDelete() verify(crud).put(expectedPutFromUpsert, context); verify(crud).put(expectedPutFromUpdate, context); verify(crud).delete(delete, context); - verify(mutationOperationChecker).check(put); - verify(mutationOperationChecker).check(expectedPutFromInsert); - verify(mutationOperationChecker).check(expectedPutFromUpsert); - verify(mutationOperationChecker).check(expectedPutFromUpdate); - verify(mutationOperationChecker).check(delete); + verify(operationChecker).check(put); + verify(operationChecker).check(expectedPutFromInsert); + verify(operationChecker).check(expectedPutFromUpsert); + verify(operationChecker).check(expectedPutFromUpdate); + verify(operationChecker).check(delete); } @Test @@ -660,11 +721,11 @@ public void batch_OperationsGiven_ShouldCallCrudHandlerProperly() verify(crud).put(expectedPutFromUpsert, context); verify(crud).put(expectedPutFromUpdate, context); verify(crud).delete(delete, context); - verify(mutationOperationChecker).check(put); - verify(mutationOperationChecker).check(expectedPutFromInsert); - verify(mutationOperationChecker).check(expectedPutFromUpsert); - verify(mutationOperationChecker).check(expectedPutFromUpdate); - verify(mutationOperationChecker).check(delete); + verify(operationChecker).check(put); + verify(operationChecker).check(expectedPutFromInsert); + verify(operationChecker).check(expectedPutFromUpsert); + verify(operationChecker).check(expectedPutFromUpdate); + verify(operationChecker).check(delete); assertThat(results).hasSize(7); assertThat(results.get(0).getType()).isEqualTo(CrudOperable.BatchResult.Type.GET); assertThat(results.get(0).getGetResult()).hasValue(result1); diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java index 6708627277..62c5d8c1f8 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java @@ -153,6 +153,7 @@ public void setUp() throws Exception { new ConsensusCommitManager( storage, admin, + consensusCommitConfig, databaseConfig, coordinator, parallelExecutor, diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index a442c78b47..304df2244c 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -4239,10 +4239,16 @@ void scan_OverlappingPutWithConjunctionsGivenBefore_ShouldThrowIllegalArgumentEx // Act Throwable thrown = catchThrowable(() -> transaction.scan(scan)); - transaction.commit(); // Assert - assertThat(thrown).doesNotThrowAnyException(); + if (isolation == Isolation.SERIALIZABLE) { + // Index scans are not allowed in SERIALIZABLE isolation + assertThat(thrown).isInstanceOf(IllegalArgumentException.class); + transaction.rollback(); + } else { + assertThat(thrown).doesNotThrowAnyException(); + transaction.commit(); + } } @ParameterizedTest @@ -5449,7 +5455,7 @@ void scan_ScanAllWithLimitGiven_WithSerializable_ShouldNotThrowAnyException() } @Test - public void scan_ScanWithIndexGiven_WithSerializable_ShouldNotThrowAnyException() + void scan_ScanWithIndexGiven_WithSerializable_ShouldThrowIllegalArgumentException() throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); @@ -5493,31 +5499,49 @@ public void scan_ScanWithIndexGiven_WithSerializable_ShouldNotThrowAnyException( // Act Assert DistributedTransaction transaction = manager.begin(); - List results = - transaction.scan( - Scan.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); + // Index scans are not allowed in SERIALIZABLE isolation + assertThatThrownBy( + () -> + transaction.scan( + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) + .build())) + .isInstanceOf(IllegalArgumentException.class); + } - assertThat(results).hasSize(5); + @Test + void get_GetWithIndexGiven_WithSerializable_ShouldThrowIllegalArgumentException() + throws TransactionException { + // Arrange + ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); + manager.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); - Set expectedIds = Sets.newHashSet(0, 1, 2, 3, 4); - for (Result result : results) { - expectedIds.remove(result.getInt(ACCOUNT_ID)); - assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - } - assertThat(expectedIds).isEmpty(); + // Act Assert + DistributedTransaction transaction = manager.begin(); - assertThatCode(transaction::commit).doesNotThrowAnyException(); + // Index gets are not allowed in SERIALIZABLE isolation + assertThatThrownBy( + () -> + transaction.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) + .build())) + .isInstanceOf(IllegalArgumentException.class); } @Test - public void - scan_ScanWithIndexGiven_RecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() - throws TransactionException { + void getScanner_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); manager.mutate( @@ -5532,68 +5556,43 @@ public void scan_ScanWithIndexGiven_WithSerializable_ShouldNotThrowAnyException( Insert.newBuilder() .namespace(namespace1) .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 2)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) .intValue(BALANCE, INITIAL_BALANCE) .build(), Insert.newBuilder() .namespace(namespace1) .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) .intValue(BALANCE, INITIAL_BALANCE) .build())); - // Act Assert + Scan scan = prepareScan(0, namespace1, TABLE_1); DistributedTransaction transaction = manager.begin(); - List results = - transaction.scan( - Scan.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - assertThat(results).hasSize(5); + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - Set expectedIds = Sets.newHashSet(0, 1, 2, 3, 4); - for (Result result : results) { - expectedIds.remove(result.getInt(ACCOUNT_ID)); - assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - } - assertThat(expectedIds).isEmpty(); + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - // The record is updated by another transaction - manager.update( - Update.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, 100) - .build()); + scanner.close(); - assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + assertThatCode(transaction::commit).doesNotThrowAnyException(); } @Test - public void - scan_ScanWithIndexGiven_RecordUpdatedByMyself_WithSerializable_ShouldNotThrowAnyException() + void + getScanner_FirstInsertedRecordByAnotherTransaction_WithSerializable_ShouldNotThrowCommitConflictException() throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); @@ -5603,74 +5602,53 @@ public void scan_ScanWithIndexGiven_WithSerializable_ShouldNotThrowAnyException( .namespace(namespace1) .table(TABLE_1) .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 2)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) .intValue(BALANCE, INITIAL_BALANCE) .build(), Insert.newBuilder() .namespace(namespace1) .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) .intValue(BALANCE, INITIAL_BALANCE) .build())); - // Act Assert + Scan scan = prepareScan(0, namespace1, TABLE_1); DistributedTransaction transaction = manager.begin(); - List results = - transaction.scan( - Scan.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - assertThat(results).hasSize(5); + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - Set expectedIds = Sets.newHashSet(0, 1, 2, 3, 4); - for (Result result : results) { - expectedIds.remove(result.getInt(ACCOUNT_ID)); - assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - } - assertThat(expectedIds).isEmpty(); + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - transaction.update( - Update.newBuilder() + scanner.close(); + + DistributedTransaction another = manager.begin(); + another.insert( + Insert.newBuilder() .namespace(namespace1) .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, 100) + .intValue(BALANCE, INITIAL_BALANCE) .build()); + another.commit(); - assertThatCode(transaction::commit).doesNotThrowAnyException(); + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); } @Test - public void - scan_ScanWithIndexGiven_RecordDeletedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() - throws TransactionException { + void getScanner_RecordInsertedByAnotherTransaction_WithSerializable_ShouldNotThrowAnyException() + throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); manager.mutate( @@ -5685,570 +5663,47 @@ public void scan_ScanWithIndexGiven_WithSerializable_ShouldNotThrowAnyException( Insert.newBuilder() .namespace(namespace1) .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 2)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) .intValue(BALANCE, INITIAL_BALANCE) .build())); - // Act Assert + Scan scan = prepareScan(0, namespace1, TABLE_1); DistributedTransaction transaction = manager.begin(); - List results = - transaction.scan( - Scan.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - assertThat(results).hasSize(5); + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - Set expectedIds = Sets.newHashSet(0, 1, 2, 3, 4); - for (Result result : results) { - expectedIds.remove(result.getInt(ACCOUNT_ID)); - assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - } - assertThat(expectedIds).isEmpty(); + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - // The record is deleted by another transaction - manager.delete( - Delete.newBuilder() + scanner.close(); + + DistributedTransaction another = manager.begin(); + another.insert( + Insert.newBuilder() .namespace(namespace1) .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) .build()); + another.commit(); - assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + assertThatCode(transaction::commit).doesNotThrowAnyException(); } @Test - public void - scan_ScanWithIndexGiven_RecordDeletedByMyself_WithSerializable_ShouldNotThrowAnyException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - manager.mutate( - Arrays.asList( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 2)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build())); - - // Act Assert - DistributedTransaction transaction = manager.begin(); - List results = - transaction.scan( - Scan.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - - assertThat(results).hasSize(5); - - Set expectedIds = Sets.newHashSet(0, 1, 2, 3, 4); - for (Result result : results) { - expectedIds.remove(result.getInt(ACCOUNT_ID)); - assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - } - assertThat(expectedIds).isEmpty(); - - transaction.delete( - Delete.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .build()); - - assertThatCode(transaction::commit).doesNotThrowAnyException(); - } - - @Test - public void scan_ScanWithIndexWithLimitGiven_WithSerializable_ShouldNotThrowAnyException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - manager.mutate( - Arrays.asList( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 2)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build())); - - // Act Assert - DistributedTransaction transaction = manager.begin(); - List results = - transaction.scan( - Scan.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .limit(3) - .build()); - - assertThat(results).hasSize(3); - - Set expectedIds = Sets.newHashSet(0, 1, 2, 3, 4); - for (Result result : results) { - expectedIds.remove(result.getInt(ACCOUNT_ID)); - assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - } - assertThat(expectedIds).hasSize(2); - - assertThatCode(transaction::commit).doesNotThrowAnyException(); - } - - @Test - public void get_GetWithIndexGiven_WithSerializable_ShouldNotThrowAnyException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - manager.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - - // Act Assert - DistributedTransaction transaction = manager.begin(); - Optional actual = - transaction.get( - Get.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - - assertThat(actual).isPresent(); - assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - assertThatCode(transaction::commit).doesNotThrowAnyException(); - } - - @Test - public void - get_GetWithIndexGiven_RecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - manager.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - - // Act Assert - DistributedTransaction transaction = manager.begin(); - Optional actual = - transaction.get( - Get.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - - assertThat(actual).isPresent(); - assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - // The record is updated by another transaction - manager.update( - Update.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, 100) - .build()); - - assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); - } - - @Test - public void - get_GetWithIndexGiven_RecordUpdatedByMyself_WithSerializable_ShouldNotThrowAnyException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - manager.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - - // Act Assert - DistributedTransaction transaction = manager.begin(); - Optional actual = - transaction.get( - Get.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - - assertThat(actual).isPresent(); - assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - transaction.update( - Update.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, 100) - .build()); - - assertThatCode(transaction::commit).doesNotThrowAnyException(); - } - - @Test - public void - get_GetWithIndexGiven_RecordDeletedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - manager.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - - // Act Assert - DistributedTransaction transaction = manager.begin(); - Optional actual = - transaction.get( - Get.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - - assertThat(actual).isPresent(); - assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - // The record is deleted by another transaction - manager.delete( - Delete.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .build()); - - assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); - } - - @Test - public void - get_GetWithIndexGiven_RecordDeletedByMyself_WithSerializable_ShouldNotThrowAnyException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - manager.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - - // Act Assert - DistributedTransaction transaction = manager.begin(); - Optional actual = - transaction.get( - Get.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - - assertThat(actual).isPresent(); - assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - transaction.delete( - Delete.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .build()); - - assertThatCode(transaction::commit).doesNotThrowAnyException(); - } - - @Test - void getScanner_WithSerializable_ShouldNotThrowAnyException() throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - manager.mutate( - Arrays.asList( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) - .intValue(BALANCE, INITIAL_BALANCE) - .build())); - - Scan scan = prepareScan(0, namespace1, TABLE_1); - DistributedTransaction transaction = manager.begin(); - - // Act Assert - TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); - Optional result1 = scanner.one(); - assertThat(result1).isNotEmpty(); - assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - Optional result2 = scanner.one(); - assertThat(result2).isNotEmpty(); - assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); - assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - scanner.close(); - - assertThatCode(transaction::commit).doesNotThrowAnyException(); - } - - @Test - void - getScanner_FirstInsertedRecordByAnotherTransaction_WithSerializable_ShouldNotThrowCommitConflictException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - manager.mutate( - Arrays.asList( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) - .intValue(BALANCE, INITIAL_BALANCE) - .build())); - - Scan scan = prepareScan(0, namespace1, TABLE_1); - DistributedTransaction transaction = manager.begin(); - - // Act Assert - TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); - Optional result1 = scanner.one(); - assertThat(result1).isNotEmpty(); - assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); - assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - Optional result2 = scanner.one(); - assertThat(result2).isNotEmpty(); - assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(2); - assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - scanner.close(); - - DistributedTransaction another = manager.begin(); - another.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - another.commit(); - - assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); - } - - @Test - void getScanner_RecordInsertedByAnotherTransaction_WithSerializable_ShouldNotThrowAnyException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - manager.mutate( - Arrays.asList( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build(), - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) - .intValue(BALANCE, INITIAL_BALANCE) - .build())); - - Scan scan = prepareScan(0, namespace1, TABLE_1); - DistributedTransaction transaction = manager.begin(); - - // Act Assert - TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); - Optional result1 = scanner.one(); - assertThat(result1).isNotEmpty(); - assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - Optional result2 = scanner.one(); - assertThat(result2).isNotEmpty(); - assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); - assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - scanner.close(); - - DistributedTransaction another = manager.begin(); - another.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - another.commit(); - - assertThatCode(transaction::commit).doesNotThrowAnyException(); - } - - @Test - void - getScanner_RecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + void + getScanner_RecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); @@ -6302,181 +5757,9 @@ void getScanner_RecordInsertedByAnotherTransaction_WithSerializable_ShouldNotThr assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); } - @Test - public void - get_GetWithIndexGiven_NoRecordsInIndexRange_WithSerializable_ShouldNotThrowAnyException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - - // Act Assert - DistributedTransaction transaction = manager.begin(); - Optional actual = - transaction.get( - Get.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - - assertThat(actual).isEmpty(); - - assertThatCode(transaction::commit).doesNotThrowAnyException(); - } - - @Test - public void - get_GetWithIndexGiven_RecordInsertedIntoIndexRangeByMyself_WithSerializable_ShouldNotThrowAnyException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - manager.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - - // Act Assert - DistributedTransaction transaction = manager.begin(); - Optional actual = - transaction.get( - Get.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - - assertThat(actual).isPresent(); - assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - transaction.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - - assertThatCode(transaction::commit).doesNotThrowAnyException(); - } - - @Test - public void - get_GetWithIndexGiven_RecordInsertedIntoIndexRangeByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - manager.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - - // Act Assert - DistributedTransaction transaction = manager.begin(); - Optional actual = - transaction.get( - Get.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - - assertThat(actual).isPresent(); - assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); - assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); - assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); - - DistributedTransaction another = manager.begin(); - another.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - another.commit(); - - assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); - } - - @Test - public void - get_GetWithIndexGiven_NoRecordsInIndexRange_RecordInsertedIntoIndexRangeByMyself_WithSerializable_ShouldNotThrowAnyException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - - // Act Assert - DistributedTransaction transaction = manager.begin(); - Optional actual = - transaction.get( - Get.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - - assertThat(actual).isEmpty(); - - transaction.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - - assertThatCode(transaction::commit).doesNotThrowAnyException(); - } - - @Test - public void - get_GetWithIndexGiven_NoRecordsInIndexRange_RecordInsertedIntoIndexRangeByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() - throws TransactionException { - // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(Isolation.SERIALIZABLE); - - // Act Assert - DistributedTransaction transaction = manager.begin(); - Optional actual = - transaction.get( - Get.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE)) - .build()); - - assertThat(actual).isEmpty(); - - DistributedTransaction another = manager.begin(); - another.insert( - Insert.newBuilder() - .namespace(namespace1) - .table(TABLE_1) - .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) - .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) - .build()); - another.commit(); - - assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); - } - @ParameterizedTest - @EnumSource(Isolation.class) - public void getAndUpdate_GetWithIndexGiven_ShouldUpdate(Isolation isolation) + @EnumSource(value = Isolation.class, mode = EnumSource.Mode.EXCLUDE, names = "SERIALIZABLE") + void getAndUpdate_GetWithIndexGiven_ShouldUpdate(Isolation isolation) throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(isolation); @@ -6530,8 +5813,8 @@ public void getAndUpdate_GetWithIndexGiven_ShouldUpdate(Isolation isolation) } @ParameterizedTest - @EnumSource(Isolation.class) - public void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate(Isolation isolation) + @EnumSource(value = Isolation.class, mode = EnumSource.Mode.EXCLUDE, names = "SERIALIZABLE") + void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate(Isolation isolation) throws TransactionException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(isolation); @@ -8499,6 +7782,7 @@ private ConsensusCommitManager createConsensusCommitManager( return new ConsensusCommitManager( storage, admin, + consensusCommitConfig, databaseConfig, coordinator, parallelExecutor,