Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package com.scalar.db.transaction.consensuscommit;

import com.google.common.collect.ImmutableMap;
import com.scalar.db.api.AndConditionSet;
import com.scalar.db.api.ConditionBuilder;
import com.scalar.db.api.ConditionSetBuilder;
import com.scalar.db.api.ConditionalExpression;
import com.scalar.db.api.Consistency;
import com.scalar.db.api.Get;
import com.scalar.db.api.GetBuilder;
import com.scalar.db.api.Insert;
import com.scalar.db.api.LikeExpression;
import com.scalar.db.api.MutationCondition;
import com.scalar.db.api.Operation;
import com.scalar.db.api.Put;
import com.scalar.db.api.PutBuilder;
import com.scalar.db.api.Scan;
import com.scalar.db.api.ScanBuilder;
import com.scalar.db.api.Selection;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.api.Update;
import com.scalar.db.api.UpdateIf;
Expand Down Expand Up @@ -352,4 +362,149 @@ static TransactionTableMetadata getTransactionTableMetadata(
}
return metadata;
}

static Get prepareGetForStorage(Get get, TableMetadata metadata) {
GetBuilder.BuildableGetOrGetWithIndexFromExisting builder =
Get.newBuilder(get).clearProjections().consistency(Consistency.LINEARIZABLE);

if (!get.getConjunctions().isEmpty()) {
// If there are conjunctions, we need to convert them to include conditions on the before
// image
Set<AndConditionSet> converted = convertConjunctions(get.getConjunctions(), metadata);
return builder.clearConditions().whereOr(converted).build();
}

return builder.build();
}

static Scan prepareScanForStorage(Scan scan, TableMetadata metadata) {
ScanBuilder.BuildableScanOrScanAllFromExisting builder =
Scan.newBuilder(scan).clearProjections().consistency(Consistency.LINEARIZABLE);

if (scan.getLimit() > 0) {
// Since the recovery process and the conjunction processing may exclude some records from
// the scan result, it is necessary to perform the scan without a limit.
builder.limit(0);
}

if (!scan.getConjunctions().isEmpty()) {
// If there are conjunctions, we need to convert them to include conditions on the before
// image
Set<AndConditionSet> converted = convertConjunctions(scan.getConjunctions(), metadata);
return builder.clearConditions().whereOr(converted).build();
}

return builder.build();
}

/**
* Converts the given conjunctions to include conditions on before images.
*
* <p>This is necessary because we might miss prepared records whose before images match the
* original conditions when reading from storage. For example, suppose we have the following
* records in storage:
*
* <pre>
* | partition_key | clustering_key | column | status | before_column | before_status |
* |---------------|----------------|--------|-----------|---------------|----------------|
* | 0 | 0 | 1000 | COMMITTED | | |
* | 0 | 1 | 200 | PREPARED | 1000 | COMMITTED |
* </pre>
*
* If we scan records with the condition "column = 1000" without converting the condition
* (conjunction), we only get the first record, not the second one, because the condition does not
* match. However, the second record has not been committed yet, so we should still retrieve it,
* considering the possibility that the record will be rolled back.
*
* <p>To handle such cases, we convert the conjunctions to include conditions on the before image.
* For example, if the original condition is:
*
* <pre>
* column = 1000
* </pre>
*
* We convert it to:
*
* <pre>
* column = 1000 OR before_column = 1000
* </pre>
*
* <p>Here are more examples:
*
* <p>Example 1:
*
* <pre>
* {@code column >= 500 AND column < 1000}
* </pre>
*
* becomes:
*
* <pre>
* {@code (column >= 500 AND column < 1000) OR (before_column >= 500 AND before_column < 1000)}
* </pre>
*
* <p>Example 2:
*
* <pre>
* {@code column1 = 500 OR column2 != 1000}
* </pre>
*
* becomes:
*
* <pre>
* {@code column1 = 500 OR column2 != 1000 OR before_column1 = 500 OR before_column2 != 1000}
* </pre>
*
* This way, we can ensure that prepared records whose before images satisfy the original scan
* conditions are not missed during the scan.
*
* @param conjunctions the conjunctions to convert
* @param metadata the table metadata of the target table
* @return the converted conjunctions
*/
private static Set<AndConditionSet> convertConjunctions(
Set<Selection.Conjunction> conjunctions, TableMetadata metadata) {
Set<AndConditionSet> converted = new HashSet<>(conjunctions.size() * 2);

// Keep the original conjunctions
conjunctions.forEach(
c -> converted.add(ConditionSetBuilder.andConditionSet(c.getConditions()).build()));

// Add conditions on the before image
for (Selection.Conjunction conjunction : conjunctions) {
Set<ConditionalExpression> conditions = new HashSet<>(conjunction.getConditions().size());
for (ConditionalExpression condition : conjunction.getConditions()) {
String columnName = condition.getColumn().getName();

if (metadata.getPartitionKeyNames().contains(columnName)
|| metadata.getClusteringKeyNames().contains(columnName)) {
// If the condition is on the primary key, we don't need to convert it
conditions.add(condition);
continue;
}

// Convert the condition to use the before image column
ConditionalExpression convertedCondition;
if (condition instanceof LikeExpression) {
LikeExpression likeExpression = (LikeExpression) condition;
convertedCondition =
ConditionBuilder.buildLikeExpression(
likeExpression.getColumn().copyWith(Attribute.BEFORE_PREFIX + columnName),
likeExpression.getOperator(),
likeExpression.getEscape());
} else {
convertedCondition =
ConditionBuilder.buildConditionalExpression(
condition.getColumn().copyWith(Attribute.BEFORE_PREFIX + columnName),
condition.getOperator());
}

conditions.add(convertedCondition);
}

converted.add(ConditionSetBuilder.andConditionSet(conditions).build());
}

return converted;
}
}
Loading