Skip to content

Commit b8b1187

Browse files
Backport to branch(3.16) : Introduce TransactionContext in Consensus Commit (#3052)
Co-authored-by: Toshihiro Suzuki <brfrn169@gmail.com>
1 parent b304859 commit b8b1187

27 files changed

+1500
-1553
lines changed

core/src/main/java/com/scalar/db/common/CoreError.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -925,10 +925,10 @@ public enum CoreError implements ScalarDbError {
925925
Category.INTERNAL_ERROR, "0044", "The Upsert operation failed. Details: %s", "", ""),
926926
JDBC_TRANSACTION_UPDATE_OPERATION_FAILED(
927927
Category.INTERNAL_ERROR, "0045", "The Update operation failed. Details: %s", "", ""),
928-
HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED(
928+
CONSENSUS_COMMIT_HANDLING_BEFORE_PREPARATION_HOOK_FAILED(
929929
Category.INTERNAL_ERROR,
930930
"0046",
931-
"Handling the before-preparation snapshot hook failed. Details: %s",
931+
"Handling the before-preparation hook failed. Details: %s",
932932
"",
933933
""),
934934
JDBC_TRANSACTION_GETTING_SCANNER_FAILED(
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22

33
import java.util.concurrent.Future;
44

5-
public interface BeforePreparationSnapshotHook {
5+
public interface BeforePreparationHook {
66
Future<Void> handle(
7-
TransactionTableMetadataManager transactionTableMetadataManager,
8-
Snapshot.ReadWriteSets readWriteSets);
7+
TransactionTableMetadataManager transactionTableMetadataManager, TransactionContext context);
98
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 111 additions & 94 deletions
Large diffs are not rendered by default.

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,55 +48,57 @@ public CommitHandlerWithGroupCommit(
4848
}
4949

5050
@Override
51-
public void commit(Snapshot snapshot, boolean readOnly)
51+
public void commit(TransactionContext context)
5252
throws CommitException, UnknownTransactionStatusException {
53-
if (!readOnly && !snapshot.hasWritesOrDeletes() && coordinatorWriteOmissionOnReadOnlyEnabled) {
54-
cancelGroupCommitIfNeeded(snapshot.getId());
53+
if (!context.readOnly
54+
&& !context.snapshot.hasWritesOrDeletes()
55+
&& coordinatorWriteOmissionOnReadOnlyEnabled) {
56+
cancelGroupCommitIfNeeded(context.transactionId);
5557
}
5658

57-
super.commit(snapshot, readOnly);
59+
super.commit(context);
5860
}
5961

6062
@Override
61-
boolean canOnePhaseCommit(Snapshot snapshot) throws CommitException {
63+
boolean canOnePhaseCommit(TransactionContext context) throws CommitException {
6264
try {
63-
return super.canOnePhaseCommit(snapshot);
65+
return super.canOnePhaseCommit(context);
6466
} catch (CommitException e) {
65-
cancelGroupCommitIfNeeded(snapshot.getId());
67+
cancelGroupCommitIfNeeded(context.transactionId);
6668
throw e;
6769
}
6870
}
6971

7072
@Override
71-
void onePhaseCommitRecords(Snapshot snapshot)
73+
void onePhaseCommitRecords(TransactionContext context)
7274
throws CommitConflictException, UnknownTransactionStatusException {
73-
cancelGroupCommitIfNeeded(snapshot.getId());
74-
super.onePhaseCommitRecords(snapshot);
75+
cancelGroupCommitIfNeeded(context.transactionId);
76+
super.onePhaseCommitRecords(context);
7577
}
7678

7779
@Override
78-
protected void onFailureBeforeCommit(Snapshot snapshot) {
79-
cancelGroupCommitIfNeeded(snapshot.getId());
80+
protected void onFailureBeforeCommit(TransactionContext context) {
81+
cancelGroupCommitIfNeeded(context.transactionId);
8082
}
8183

82-
private void commitStateViaGroupCommit(Snapshot snapshot)
84+
private void commitStateViaGroupCommit(TransactionContext context)
8385
throws CommitConflictException, UnknownTransactionStatusException {
84-
String id = snapshot.getId();
86+
String id = context.transactionId;
8587
try {
8688
// Group commit the state by internally calling `groupCommitState()` via the emitter.
87-
groupCommitter.ready(id, snapshot);
89+
groupCommitter.ready(id, context);
8890
logger.debug(
8991
"Transaction {} is committed successfully at {}", id, System.currentTimeMillis());
9092
} catch (GroupCommitConflictException e) {
9193
cancelGroupCommitIfNeeded(id);
9294
// Throw a proper exception from this method if needed.
93-
handleCommitConflict(snapshot, e);
95+
handleCommitConflict(context, e);
9496
} catch (GroupCommitException e) {
9597
cancelGroupCommitIfNeeded(id);
9698
Throwable cause = e.getCause();
9799
if (cause instanceof CoordinatorConflictException) {
98100
// Throw a proper exception from this method if needed.
99-
handleCommitConflict(snapshot, (CoordinatorConflictException) cause);
101+
handleCommitConflict(context, (CoordinatorConflictException) cause);
100102
} else {
101103
// Failed to access the coordinator state. The state is unknown.
102104
throw new UnknownTransactionStatusException("Coordinator status is unknown", cause, id);
@@ -118,9 +120,9 @@ private void cancelGroupCommitIfNeeded(String id) {
118120
}
119121

120122
@Override
121-
public void commitState(Snapshot snapshot)
123+
public void commitState(TransactionContext context)
122124
throws CommitConflictException, UnknownTransactionStatusException {
123-
commitStateViaGroupCommit(snapshot);
125+
commitStateViaGroupCommit(context);
124126
}
125127

126128
@Override
@@ -129,25 +131,25 @@ public TransactionState abortState(String id) throws UnknownTransactionStatusExc
129131
return super.abortState(id);
130132
}
131133

132-
private static class Emitter implements Emittable<String, String, Snapshot> {
134+
private static class Emitter implements Emittable<String, String, TransactionContext> {
133135
private final Coordinator coordinator;
134136

135137
public Emitter(Coordinator coordinator) {
136138
this.coordinator = coordinator;
137139
}
138140

139141
@Override
140-
public void emitNormalGroup(String parentId, List<Snapshot> snapshots)
142+
public void emitNormalGroup(String parentId, List<TransactionContext> contexts)
141143
throws CoordinatorException {
142-
if (snapshots.isEmpty()) {
144+
if (contexts.isEmpty()) {
143145
// This means all buffered transactions were manually rolled back. Nothing to do.
144146
return;
145147
}
146148

147149
// These transactions are contained in a normal group that has multiple transactions.
148150
// Therefore, the transaction states should be put together in Coordinator.State.
149151
List<String> transactionIds =
150-
snapshots.stream().map(Snapshot::getId).collect(Collectors.toList());
152+
contexts.stream().map(c -> c.transactionId).collect(Collectors.toList());
151153

152154
coordinator.putStateForGroupCommit(
153155
parentId, transactionIds, TransactionState.COMMITTED, System.currentTimeMillis());
@@ -159,7 +161,8 @@ public void emitNormalGroup(String parentId, List<Snapshot> snapshots)
159161
}
160162

161163
@Override
162-
public void emitDelayedGroup(String fullId, Snapshot snapshot) throws CoordinatorException {
164+
public void emitDelayedGroup(String fullId, TransactionContext context)
165+
throws CoordinatorException {
163166
// This transaction is contained in a delayed group that has only a single transaction.
164167
// Therefore, the transaction state can be committed as if it's a normal commit (not a
165168
// group commit).

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommit.java

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,20 @@
4747
@NotThreadSafe
4848
public class ConsensusCommit extends AbstractDistributedTransaction {
4949
private static final Logger logger = LoggerFactory.getLogger(ConsensusCommit.class);
50+
private final TransactionContext context;
5051
private final CrudHandler crud;
5152
private final CommitHandler commit;
5253
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
5354
@Nullable private final CoordinatorGroupCommitter groupCommitter;
5455

5556
@SuppressFBWarnings("EI_EXPOSE_REP2")
5657
public ConsensusCommit(
58+
TransactionContext context,
5759
CrudHandler crud,
5860
CommitHandler commit,
5961
ConsensusCommitMutationOperationChecker mutationOperationChecker,
6062
@Nullable CoordinatorGroupCommitter groupCommitter) {
63+
this.context = checkNotNull(context);
6164
this.crud = checkNotNull(crud);
6265
this.commit = checkNotNull(commit);
6366
this.mutationOperationChecker = mutationOperationChecker;
@@ -66,23 +69,23 @@ public ConsensusCommit(
6669

6770
@Override
6871
public String getId() {
69-
return crud.getSnapshot().getId();
72+
return context.transactionId;
7073
}
7174

7275
@Override
7376
public Optional<Result> get(Get get) throws CrudException {
74-
return crud.get(copyAndSetTargetToIfNot(get));
77+
return crud.get(copyAndSetTargetToIfNot(get), context);
7578
}
7679

7780
@Override
7881
public List<Result> scan(Scan scan) throws CrudException {
79-
return crud.scan(copyAndSetTargetToIfNot(scan));
82+
return crud.scan(copyAndSetTargetToIfNot(scan), context);
8083
}
8184

8285
@Override
8386
public Scanner getScanner(Scan scan) throws CrudException {
8487
scan = copyAndSetTargetToIfNot(scan);
85-
return crud.getScanner(scan);
88+
return crud.getScanner(scan, context);
8689
}
8790

8891
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -91,7 +94,7 @@ public Scanner getScanner(Scan scan) throws CrudException {
9194
public void put(Put put) throws CrudException {
9295
put = copyAndSetTargetToIfNot(put);
9396
checkMutation(put);
94-
crud.put(put);
97+
crud.put(put, context);
9598
}
9699

97100
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -108,7 +111,7 @@ public void put(List<Put> puts) throws CrudException {
108111
public void delete(Delete delete) throws CrudException {
109112
delete = copyAndSetTargetToIfNot(delete);
110113
checkMutation(delete);
111-
crud.delete(delete);
114+
crud.delete(delete, context);
112115
}
113116

114117
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -126,15 +129,15 @@ public void insert(Insert insert) throws CrudException {
126129
insert = copyAndSetTargetToIfNot(insert);
127130
Put put = ConsensusCommitUtils.createPutForInsert(insert);
128131
checkMutation(put);
129-
crud.put(put);
132+
crud.put(put, context);
130133
}
131134

132135
@Override
133136
public void upsert(Upsert upsert) throws CrudException {
134137
upsert = copyAndSetTargetToIfNot(upsert);
135138
Put put = ConsensusCommitUtils.createPutForUpsert(upsert);
136139
checkMutation(put);
137-
crud.put(put);
140+
crud.put(put, context);
138141
}
139142

140143
@Override
@@ -144,13 +147,13 @@ public void update(Update update) throws CrudException {
144147
Put put = ConsensusCommitUtils.createPutForUpdate(update);
145148
checkMutation(put);
146149
try {
147-
crud.put(put);
150+
crud.put(put, context);
148151
} catch (UnsatisfiedConditionException e) {
149152
if (update.getCondition().isPresent()) {
150153
throw new UnsatisfiedConditionException(
151154
ConsensusCommitUtils.convertUnsatisfiedConditionExceptionMessageForUpdate(
152155
e, update.getCondition().get()),
153-
crud.getSnapshot().getId());
156+
getId());
154157
}
155158

156159
// If the condition is not specified, it means that the record does not exist. In this case,
@@ -179,13 +182,13 @@ public void mutate(List<? extends Mutation> mutations) throws CrudException {
179182

180183
@Override
181184
public void commit() throws CommitException, UnknownTransactionStatusException {
182-
if (!crud.areAllScannersClosed()) {
185+
if (!context.areAllScannersClosed()) {
183186
throw new IllegalStateException(CoreError.CONSENSUS_COMMIT_SCANNER_NOT_CLOSED.buildMessage());
184187
}
185188

186189
// Execute implicit pre-read
187190
try {
188-
crud.readIfImplicitPreReadEnabled();
191+
crud.readIfImplicitPreReadEnabled(context);
189192
} catch (CrudConflictException e) {
190193
throw new CommitConflictException(
191194
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHILE_IMPLICIT_PRE_READ.buildMessage(),
@@ -197,29 +200,34 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
197200
}
198201

199202
try {
200-
crud.waitForRecoveryCompletionIfNecessary();
203+
crud.waitForRecoveryCompletionIfNecessary(context);
201204
} catch (CrudConflictException e) {
202205
throw new CommitConflictException(e.getMessage(), e, getId());
203206
} catch (CrudException e) {
204207
throw new CommitException(e.getMessage(), e, getId());
205208
}
206209

207-
commit.commit(crud.getSnapshot(), crud.isReadOnly());
210+
commit.commit(context);
208211
}
209212

210213
@Override
211214
public void rollback() {
212215
try {
213-
crud.closeScanners();
216+
context.closeScanners();
214217
} catch (CrudException e) {
215218
logger.warn("Failed to close the scanner", e);
216219
}
217220

218-
if (groupCommitter != null && !crud.isReadOnly()) {
219-
groupCommitter.remove(crud.getSnapshot().getId());
221+
if (groupCommitter != null && !context.readOnly) {
222+
groupCommitter.remove(getId());
220223
}
221224
}
222225

226+
@VisibleForTesting
227+
TransactionContext getTransactionContext() {
228+
return context;
229+
}
230+
223231
@VisibleForTesting
224232
CrudHandler getCrudHandler() {
225233
return crud;
@@ -230,6 +238,11 @@ CommitHandler getCommitHandler() {
230238
return commit;
231239
}
232240

241+
@VisibleForTesting
242+
void waitForRecoveryCompletion() throws CrudException {
243+
crud.waitForRecoveryCompletion(context);
244+
}
245+
233246
private void checkMutation(Mutation mutation) throws CrudException {
234247
try {
235248
mutationOperationChecker.check(mutation);

0 commit comments

Comments
 (0)