Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,7 @@ public static class InternalConfig {
public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__";

public static boolean stateUpdaterEnabled(final Map<String, Object> configs) {
return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true);
return true; // always enabled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this is temporary and will be removed soon

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! The config will be removed in a follow-up PR.
I have all the changes ready and will open a PR as
soon as we merge this one.

}

// Private API to enable processing threads (i.e. polling is decoupled from processing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
Expand Down Expand Up @@ -412,39 +411,11 @@ public long position(final TopicPartition partition) {

@ParameterizedTest
@EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
public void shouldPollWithRightTimeoutWithStateUpdater(final Task.TaskType type) {
public void shouldPollWithRightTimeout(final Task.TaskType type) {
setupStateManagerMock(type);
setupStoreMetadata();
setupStore();
shouldPollWithRightTimeout(true, type);
}

@ParameterizedTest
@EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
public void shouldPollWithRightTimeoutWithoutStateUpdater(final Task.TaskType type) {
setupStateManagerMock(type);
setupStoreMetadata();
setupStore();
shouldPollWithRightTimeout(false, type);
}

private void shouldPollWithRightTimeout(final boolean stateUpdaterEnabled, final Task.TaskType type) {
final Properties properties = new Properties();
properties.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
shouldPollWithRightTimeout(properties, type);
}

@ParameterizedTest
@EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
public void shouldPollWithRightTimeoutWithStateUpdaterDefault(final Task.TaskType type) {
setupStateManagerMock(type);
setupStoreMetadata();
setupStore();
final Properties properties = new Properties();
shouldPollWithRightTimeout(properties, type);
}

private void shouldPollWithRightTimeout(final Properties properties, final Task.TaskType type) {
final TaskId taskId = new TaskId(0, 0);

when(storeMetadata.offset()).thenReturn(null).thenReturn(9L);
Expand All @@ -453,8 +424,6 @@ private void shouldPollWithRightTimeout(final Properties properties, final Task.
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));

final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig("test-reader", properties));

final StoreChangelogReader changelogReader =
new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, standbyListener);

Expand All @@ -465,16 +434,8 @@ private void shouldPollWithRightTimeout(final Properties properties, final Task.
}

changelogReader.restore(Collections.singletonMap(taskId, mock(Task.class)));
if (type == ACTIVE) {
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
} else {
if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED)
|| (boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED)) {
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
} else {
assertEquals(Duration.ZERO, consumer.lastPollTimeout());
}
}

assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
}

@ParameterizedTest
Expand Down
Loading
Loading