|
1 | 1 | package com.scalar.db.transaction.consensuscommit; |
2 | 2 |
|
3 | 3 | import com.google.common.collect.ImmutableMap; |
| 4 | +import com.scalar.db.api.AndConditionSet; |
4 | 5 | import com.scalar.db.api.ConditionBuilder; |
| 6 | +import com.scalar.db.api.ConditionSetBuilder; |
| 7 | +import com.scalar.db.api.ConditionalExpression; |
| 8 | +import com.scalar.db.api.Consistency; |
| 9 | +import com.scalar.db.api.Get; |
| 10 | +import com.scalar.db.api.GetBuilder; |
5 | 11 | import com.scalar.db.api.Insert; |
| 12 | +import com.scalar.db.api.LikeExpression; |
6 | 13 | import com.scalar.db.api.MutationCondition; |
7 | 14 | import com.scalar.db.api.Operation; |
8 | 15 | import com.scalar.db.api.Put; |
9 | 16 | import com.scalar.db.api.PutBuilder; |
| 17 | +import com.scalar.db.api.Scan; |
| 18 | +import com.scalar.db.api.ScanBuilder; |
| 19 | +import com.scalar.db.api.Selection; |
10 | 20 | import com.scalar.db.api.TableMetadata; |
11 | 21 | import com.scalar.db.api.Update; |
12 | 22 | import com.scalar.db.api.UpdateIf; |
@@ -352,4 +362,149 @@ static TransactionTableMetadata getTransactionTableMetadata( |
352 | 362 | } |
353 | 363 | return metadata; |
354 | 364 | } |
| 365 | + |
| 366 | + static Get prepareGetForStorage(Get get, TableMetadata metadata) { |
| 367 | + GetBuilder.BuildableGetOrGetWithIndexFromExisting builder = |
| 368 | + Get.newBuilder(get).clearProjections().consistency(Consistency.LINEARIZABLE); |
| 369 | + |
| 370 | + if (!get.getConjunctions().isEmpty()) { |
| 371 | + // If there are conjunctions, we need to convert them to include conditions on the before |
| 372 | + // image |
| 373 | + Set<AndConditionSet> converted = convertConjunctions(get.getConjunctions(), metadata); |
| 374 | + return builder.clearConditions().whereOr(converted).build(); |
| 375 | + } |
| 376 | + |
| 377 | + return builder.build(); |
| 378 | + } |
| 379 | + |
| 380 | + static Scan prepareScanForStorage(Scan scan, TableMetadata metadata) { |
| 381 | + ScanBuilder.BuildableScanOrScanAllFromExisting builder = |
| 382 | + Scan.newBuilder(scan).clearProjections().consistency(Consistency.LINEARIZABLE); |
| 383 | + |
| 384 | + if (scan.getLimit() > 0) { |
| 385 | + // Since the recovery process and the conjunction processing may exclude some records from |
| 386 | + // the scan result, it is necessary to perform the scan without a limit. |
| 387 | + builder.limit(0); |
| 388 | + } |
| 389 | + |
| 390 | + if (!scan.getConjunctions().isEmpty()) { |
| 391 | + // If there are conjunctions, we need to convert them to include conditions on the before |
| 392 | + // image |
| 393 | + Set<AndConditionSet> converted = convertConjunctions(scan.getConjunctions(), metadata); |
| 394 | + return builder.clearConditions().whereOr(converted).build(); |
| 395 | + } |
| 396 | + |
| 397 | + return builder.build(); |
| 398 | + } |
| 399 | + |
| 400 | + /** |
| 401 | + * Converts the given conjunctions to include conditions on before images. |
| 402 | + * |
| 403 | + * <p>This is necessary because we might miss prepared records whose before images match the |
| 404 | + * original conditions when reading from storage. For example, suppose we have the following |
| 405 | + * records in storage: |
| 406 | + * |
| 407 | + * <pre> |
| 408 | + * | partition_key | clustering_key | column | status | before_column | before_status | |
| 409 | + * |---------------|----------------|--------|-----------|---------------|----------------| |
| 410 | + * | 0 | 0 | 1000 | COMMITTED | | | |
| 411 | + * | 0 | 1 | 200 | PREPARED | 1000 | COMMITTED | |
| 412 | + * </pre> |
| 413 | + * |
| 414 | + * If we scan records with the condition "column = 1000" without converting the condition |
| 415 | + * (conjunction), we only get the first record, not the second one, because the condition does not |
| 416 | + * match. However, the second record has not been committed yet, so we should still retrieve it, |
| 417 | + * considering the possibility that the record will be rolled back. |
| 418 | + * |
| 419 | + * <p>To handle such cases, we convert the conjunctions to include conditions on the before image. |
| 420 | + * For example, if the original condition is: |
| 421 | + * |
| 422 | + * <pre> |
| 423 | + * column = 1000 |
| 424 | + * </pre> |
| 425 | + * |
| 426 | + * We convert it to: |
| 427 | + * |
| 428 | + * <pre> |
| 429 | + * column = 1000 OR before_column = 1000 |
| 430 | + * </pre> |
| 431 | + * |
| 432 | + * <p>Here are more examples: |
| 433 | + * |
| 434 | + * <p>Example 1: |
| 435 | + * |
| 436 | + * <pre> |
| 437 | + * {@code column >= 500 AND column < 1000} |
| 438 | + * </pre> |
| 439 | + * |
| 440 | + * becomes: |
| 441 | + * |
| 442 | + * <pre> |
| 443 | + * {@code (column >= 500 AND column < 1000) OR (before_column >= 500 AND before_column < 1000)} |
| 444 | + * </pre> |
| 445 | + * |
| 446 | + * <p>Example 2: |
| 447 | + * |
| 448 | + * <pre> |
| 449 | + * {@code column1 = 500 OR column2 != 1000} |
| 450 | + * </pre> |
| 451 | + * |
| 452 | + * becomes: |
| 453 | + * |
| 454 | + * <pre> |
| 455 | + * {@code column1 = 500 OR column2 != 1000 OR before_column1 = 500 OR before_column2 != 1000} |
| 456 | + * </pre> |
| 457 | + * |
| 458 | + * This way, we can ensure that prepared records whose before images satisfy the original scan |
| 459 | + * conditions are not missed during the scan. |
| 460 | + * |
| 461 | + * @param conjunctions the conjunctions to convert |
| 462 | + * @param metadata the table metadata of the target table |
| 463 | + * @return the converted conjunctions |
| 464 | + */ |
| 465 | + private static Set<AndConditionSet> convertConjunctions( |
| 466 | + Set<Selection.Conjunction> conjunctions, TableMetadata metadata) { |
| 467 | + Set<AndConditionSet> converted = new HashSet<>(conjunctions.size() * 2); |
| 468 | + |
| 469 | + // Keep the original conjunctions |
| 470 | + conjunctions.forEach( |
| 471 | + c -> converted.add(ConditionSetBuilder.andConditionSet(c.getConditions()).build())); |
| 472 | + |
| 473 | + // Add conditions on the before image |
| 474 | + for (Selection.Conjunction conjunction : conjunctions) { |
| 475 | + Set<ConditionalExpression> conditions = new HashSet<>(conjunction.getConditions().size()); |
| 476 | + for (ConditionalExpression condition : conjunction.getConditions()) { |
| 477 | + String columnName = condition.getColumn().getName(); |
| 478 | + |
| 479 | + if (metadata.getPartitionKeyNames().contains(columnName) |
| 480 | + || metadata.getClusteringKeyNames().contains(columnName)) { |
| 481 | + // If the condition is on the primary key, we don't need to convert it |
| 482 | + conditions.add(condition); |
| 483 | + continue; |
| 484 | + } |
| 485 | + |
| 486 | + // Convert the condition to use the before image column |
| 487 | + ConditionalExpression convertedCondition; |
| 488 | + if (condition instanceof LikeExpression) { |
| 489 | + LikeExpression likeExpression = (LikeExpression) condition; |
| 490 | + convertedCondition = |
| 491 | + ConditionBuilder.buildLikeExpression( |
| 492 | + likeExpression.getColumn().copyWith(Attribute.BEFORE_PREFIX + columnName), |
| 493 | + likeExpression.getOperator(), |
| 494 | + likeExpression.getEscape()); |
| 495 | + } else { |
| 496 | + convertedCondition = |
| 497 | + ConditionBuilder.buildConditionalExpression( |
| 498 | + condition.getColumn().copyWith(Attribute.BEFORE_PREFIX + columnName), |
| 499 | + condition.getOperator()); |
| 500 | + } |
| 501 | + |
| 502 | + conditions.add(convertedCondition); |
| 503 | + } |
| 504 | + |
| 505 | + converted.add(ConditionSetBuilder.andConditionSet(conditions).build()); |
| 506 | + } |
| 507 | + |
| 508 | + return converted; |
| 509 | + } |
355 | 510 | } |
0 commit comments