Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class ConsensusCommitConfig {

public static final String COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED =
PREFIX + "coordinator.write_omission_on_read_only.enabled";
public static final String RECOVERY_EXECUTOR_COUNT = PREFIX + "recovery_executor_count";
public static final String PARALLEL_IMPLICIT_PRE_READ =
PREFIX + "parallel_implicit_pre_read.enabled";
public static final String INCLUDE_METADATA_ENABLED = PREFIX + "include_metadata.enabled";
Expand All @@ -57,7 +56,6 @@ public class ConsensusCommitConfig {
COORDINATOR_GROUP_COMMIT_PREFIX + "metrics_monitor_log_enabled";

public static final int DEFAULT_PARALLEL_EXECUTOR_COUNT = 128;
public static final int DEFAULT_RECOVERY_EXECUTOR_COUNT = 128;

public static final int DEFAULT_COORDINATOR_GROUP_COMMIT_SLOT_CAPACITY = 20;
public static final int DEFAULT_COORDINATOR_GROUP_COMMIT_GROUP_SIZE_FIX_TIMEOUT_MILLIS = 40;
Expand All @@ -77,7 +75,6 @@ public class ConsensusCommitConfig {
private final boolean asyncRollbackEnabled;

private final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
private final int recoveryExecutorCount;
private final boolean parallelImplicitPreReadEnabled;
private final boolean isIncludeMetadataEnabled;

Expand Down Expand Up @@ -148,9 +145,6 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
coordinatorWriteOmissionOnReadOnlyEnabled =
getBoolean(properties, COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, true);

recoveryExecutorCount =
getInt(properties, RECOVERY_EXECUTOR_COUNT, DEFAULT_RECOVERY_EXECUTOR_COUNT);

isIncludeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false);

parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true);
Expand Down Expand Up @@ -225,10 +219,6 @@ public boolean isCoordinatorWriteOmissionOnReadOnlyEnabled() {
return coordinatorWriteOmissionOnReadOnlyEnabled;
}

public int getRecoveryExecutorCount() {
return recoveryExecutorCount;
}

public boolean isParallelImplicitPreReadEnabled() {
return parallelImplicitPreReadEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ public ConsensusCommitManager(
new TransactionTableMetadataManager(
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
recoveryExecutor =
new RecoveryExecutor(
coordinator, recovery, tableMetadataManager, config.getRecoveryExecutorCount());
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
commit = createCommitHandler();
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
Expand All @@ -94,9 +92,7 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
new TransactionTableMetadataManager(
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
recoveryExecutor =
new RecoveryExecutor(
coordinator, recovery, tableMetadataManager, config.getRecoveryExecutorCount());
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
commit = createCommitHandler();
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ public class RecoveryExecutor implements AutoCloseable {
public RecoveryExecutor(
Coordinator coordinator,
RecoveryHandler recovery,
TransactionTableMetadataManager tableMetadataManager,
int threadPoolSize) {
TransactionTableMetadataManager tableMetadataManager) {
this.coordinator = Objects.requireNonNull(coordinator);
this.recovery = Objects.requireNonNull(recovery);
this.tableMetadataManager = Objects.requireNonNull(tableMetadataManager);
executorService =
Executors.newFixedThreadPool(
threadPoolSize,
Executors.newCachedThreadPool(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it cannot set the max size?

Copy link
Collaborator Author

@brfrn169 brfrn169 Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the Java standard API Executors.newCachedThreadPool(), we cannot set a maximum pool size.

The implementation of Executors.newCachedThreadPool() is as follows:

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

If we want to set a maximum size, we can do something like this:

                   new ThreadPoolExecutor(0, <MAX_SIZE>,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);

But for now, as discussed, we’ve decided to use Executors.newCachedThreadPool() and see how it performs during benchmarking.

new ThreadFactoryBuilder()
.setNameFormat("recovery-executor-%d")
.setDaemon(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ public TwoPhaseConsensusCommitManager(
coordinator = new Coordinator(storage, config);
parallelExecutor = new ParallelExecutor(config);
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
recoveryExecutor =
new RecoveryExecutor(
coordinator, recovery, tableMetadataManager, config.getRecoveryExecutorCount());
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
commit =
new CommitHandler(
storage,
Expand All @@ -100,9 +98,7 @@ public TwoPhaseConsensusCommitManager(DatabaseConfig databaseConfig) {
coordinator = new Coordinator(storage, config);
parallelExecutor = new ParallelExecutor(config);
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
recoveryExecutor =
new RecoveryExecutor(
coordinator, recovery, tableMetadataManager, config.getRecoveryExecutorCount());
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
commit =
new CommitHandler(
storage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public void constructor_NoPropertiesGiven_ShouldLoadAsDefaultValues() {
assertThat(config.isAsyncCommitEnabled()).isFalse();
assertThat(config.isAsyncRollbackEnabled()).isFalse();
assertThat(config.isCoordinatorWriteOmissionOnReadOnlyEnabled()).isTrue();
assertThat(config.getRecoveryExecutorCount()).isEqualTo(128);
assertThat(config.isParallelImplicitPreReadEnabled()).isTrue();
assertThat(config.isIncludeMetadataEnabled()).isFalse();
}
Expand Down Expand Up @@ -160,19 +159,6 @@ public void constructor_AsyncExecutionRelatedPropertiesGiven_ShouldLoadProperly(
constructor_PropertiesWithCoordinatorWriteOmissionOnReadOnlyEnabledGiven_ShouldLoadProperly() {
// Arrange
Properties props = new Properties();
props.setProperty(ConsensusCommitConfig.RECOVERY_EXECUTOR_COUNT, "256");

// Act
ConsensusCommitConfig config = new ConsensusCommitConfig(new DatabaseConfig(props));

// Assert
assertThat(config.getRecoveryExecutorCount()).isEqualTo(256);
}

@Test
public void constructor_PropertiesWithRecoveryExecutorCountGiven_ShouldLoadProperly() {
// Arrange
Properties props = new Properties();
props.setProperty(
ConsensusCommitConfig.COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, "false");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public class RecoveryExecutorTest {
public void setUp() throws Exception {
MockitoAnnotations.openMocks(this).close();

executor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager, 1);
executor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);

// Arrange
when(tableMetadataManager.getTransactionTableMetadata(selection))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,7 @@ public void setUp() throws Exception {
TransactionTableMetadataManager tableMetadataManager =
new TransactionTableMetadataManager(admin, -1);
recovery = spy(new RecoveryHandler(storage, coordinator, tableMetadataManager));
recoveryExecutor =
new RecoveryExecutor(
coordinator,
recovery,
tableMetadataManager,
consensusCommitConfig.getRecoveryExecutorCount());
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
groupCommitter = CoordinatorGroupCommitter.from(consensusCommitConfig).orElse(null);
CommitHandler commit = spy(createCommitHandler(tableMetadataManager, groupCommitter));
manager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,7 @@ public void setUp() throws Exception {
TransactionTableMetadataManager tableMetadataManager =
new TransactionTableMetadataManager(admin, -1);
recovery = spy(new RecoveryHandler(storage, coordinator, tableMetadataManager));
recoveryExecutor =
new RecoveryExecutor(
coordinator,
recovery,
tableMetadataManager,
consensusCommitConfig.getRecoveryExecutorCount());
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
groupCommitter = CoordinatorGroupCommitter.from(consensusCommitConfig).orElse(null);
commit = spy(createCommitHandler(tableMetadataManager, groupCommitter));
manager =
Expand Down
Loading