Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.resolver.AddressResolverGroup;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.netty.internal.util.Metrics;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpResources;

Expand Down Expand Up @@ -131,6 +132,8 @@ public final class MySqlConnectionConfiguration {
@Nullable
private final AddressResolverGroup<?> resolver;

private final boolean metrics;

private MySqlConnectionConfiguration(
boolean isHost, String domain, int port, MySqlSslConfiguration ssl,
boolean tcpKeepAlive, boolean tcpNoDelay, @Nullable Duration connectTimeout,
Expand All @@ -146,7 +149,8 @@ private MySqlConnectionConfiguration(
Set<CompressionAlgorithm> compressionAlgorithms, int zstdCompressionLevel,
@Nullable LoopResources loopResources,
Extensions extensions, @Nullable Publisher<String> passwordPublisher,
@Nullable AddressResolverGroup<?> resolver
@Nullable AddressResolverGroup<?> resolver,
boolean metrics
) {
this.isHost = isHost;
this.domain = domain;
Expand Down Expand Up @@ -177,6 +181,7 @@ private MySqlConnectionConfiguration(
this.extensions = extensions;
this.passwordPublisher = passwordPublisher;
this.resolver = resolver;
this.metrics = metrics;
}

/**
Expand Down Expand Up @@ -312,6 +317,10 @@ AddressResolverGroup<?> getResolver() {
return resolver;
}

boolean isMetrics() {
return metrics;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -349,7 +358,8 @@ public boolean equals(Object o) {
Objects.equals(loopResources, that.loopResources) &&
extensions.equals(that.extensions) &&
Objects.equals(passwordPublisher, that.passwordPublisher) &&
Objects.equals(resolver, that.resolver);
Objects.equals(resolver, that.resolver) &&
metrics == that.metrics;
}

@Override
Expand All @@ -364,7 +374,7 @@ public int hashCode() {
loadLocalInfilePath, localInfileBufferSize,
queryCacheSize, prepareCacheSize,
compressionAlgorithms, zstdCompressionLevel,
loopResources, extensions, passwordPublisher, resolver);
loopResources, extensions, passwordPublisher, resolver, metrics);
}

@Override
Expand Down Expand Up @@ -398,7 +408,8 @@ private String buildCommonToStringPart() {
", loopResources=" + loopResources +
", extensions=" + extensions +
", passwordPublisher=" + passwordPublisher +
", resolver=" + resolver;
", resolver=" + resolver +
", metrics=" + metrics;
}

/**
Expand Down Expand Up @@ -498,6 +509,8 @@ public static final class Builder {
@Nullable
private AddressResolverGroup<?> resolver;

private boolean metrics;

/**
* Builds an immutable {@link MySqlConnectionConfiguration} with current options.
*
Expand Down Expand Up @@ -532,7 +545,7 @@ public MySqlConnectionConfiguration build() {
loadLocalInfilePath,
localInfileBufferSize, queryCacheSize, prepareCacheSize,
compressionAlgorithms, zstdCompressionLevel, loopResources,
Extensions.from(extensions, autodetectExtensions), passwordPublisher, resolver);
Extensions.from(extensions, autodetectExtensions), passwordPublisher, resolver, metrics);
}

/**
Expand Down Expand Up @@ -1175,6 +1188,25 @@ public Builder resolver(AddressResolverGroup<?> resolver) {
return this;
}

/**
* Option to enable metrics to be collected and registered in Micrometer's globalRegistry
* with {@link reactor.netty.tcp.TcpClient#metrics(boolean)}. Defaults to {@code false}.
* <p>
* Note: It is required to add {@code io.micrometer.micrometer-core} dependency to classpath.
*
* @param enabled enable metrics for {@link reactor.netty.tcp.TcpClient}.
* @return this {@link Builder}
* @throws IllegalArgumentException if {@code io.micrometer:micrometer-core} is not on the classpath.
* @since 1.3.2
*/
public Builder metrics(boolean enabled) {
require(!enabled || Metrics.isMicrometerAvailable(),
"dependency `io.micrometer:micrometer-core` must be added to classpath if metrics enabled"
);
this.metrics = enabled;
return this;
}

private SslMode requireSslMode() {
SslMode sslMode = this.sslMode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ private static Mono<MySqlConnection> getMySqlConnection(
context,
configuration.getConnectTimeout(),
configuration.getLoopResources(),
configuration.getResolver()
configuration.getResolver(),
configuration.isMetrics()
)).flatMap(client -> {
// Lazy init database after handshake/login
boolean deferDatabase = configuration.isCreateDatabaseIfNotExist();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,16 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
*/
public static final Option<AddressResolverGroup<?>> RESOLVER = Option.valueOf("resolver");

/**
* Option to enable metrics to be collected and registered in Micrometer's globalRegistry
* with {@link reactor.netty.tcp.TcpClient#metrics(boolean)}. Defaults to {@code false}.
* <p>
* Note: It is required to add {@code io.micrometer.micrometer-core} dependency to classpath.
*
* @since 1.3.2
*/
public static final Option<Boolean> METRICS = Option.valueOf("metrics");

@Override
public ConnectionFactory create(ConnectionFactoryOptions options) {
requireNonNull(options, "connectionFactoryOptions must not be null");
Expand Down Expand Up @@ -413,6 +423,8 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
.to(builder::lockWaitTimeout);
mapper.optional(STATEMENT_TIMEOUT).as(Duration.class, Duration::parse)
.to(builder::statementTimeout);
mapper.optional(METRICS).asBoolean()
.to(builder::metrics);

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,21 @@ public interface Client {
* @param context the connection context
* @param connectTimeout connect timeout, or {@code null} if it has no timeout
* @param loopResources the loop resources to use
* @param metrics if enable the {@link TcpClient#metrics)}
* @return A {@link Mono} that will emit a connected {@link Client}.
* @throws IllegalArgumentException if {@code ssl}, {@code address} or {@code context} is {@code null}.
* @throws ArithmeticException if {@code connectTimeout} milliseconds overflow as an int
*/
static Mono<Client> connect(MySqlSslConfiguration ssl, SocketAddress address, boolean tcpKeepAlive,
boolean tcpNoDelay, ConnectionContext context, @Nullable Duration connectTimeout,
LoopResources loopResources, @Nullable AddressResolverGroup<?> resolver) {
LoopResources loopResources, @Nullable AddressResolverGroup<?> resolver, boolean metrics) {
requireNonNull(ssl, "ssl must not be null");
requireNonNull(address, "address must not be null");
requireNonNull(context, "context must not be null");

TcpClient tcpClient = TcpClient.newConnection()
.runOn(loopResources);
.runOn(loopResources)
.metrics(metrics);

if (connectTimeout != null) {
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,19 @@ void validResolver() {
assertThat(resolverGroup).isSameAs(resolver);
}

@Test
void invalidMetrics() {
// throw exception when metrics true without micrometer-core dependency
assertThatIllegalArgumentException().isThrownBy(() ->
MySqlConnectionConfiguration
.builder()
.host(HOST)
.user(USER)
.metrics(true)
.build()
);
}

private static MySqlConnectionConfiguration unixSocketSslMode(SslMode sslMode) {
return MySqlConnectionConfiguration.builder()
.unixSocket(UNIX_SOCKET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.asyncer.r2dbc.mysql.MySqlConnectionFactoryProvider.METRICS;
import static io.asyncer.r2dbc.mysql.MySqlConnectionFactoryProvider.PASSWORD_PUBLISHER;
import static io.asyncer.r2dbc.mysql.MySqlConnectionFactoryProvider.RESOLVER;
import static io.asyncer.r2dbc.mysql.MySqlConnectionFactoryProvider.USE_SERVER_PREPARE_STATEMENT;
Expand Down Expand Up @@ -469,6 +470,18 @@ void validResolver() {
assertThat(ConnectionFactories.get(options)).isExactlyInstanceOf(MySqlConnectionFactory.class);
}

@Test
void invalidMetrics() {
// throw exception when metrics true without micrometer-core dependency
assertThatIllegalArgumentException().isThrownBy(() ->
ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, "mysql")
.option(HOST, "127.0.0.1")
.option(USER, "root")
.option(METRICS, true)
.build()));
}

@Test
void allConfigurationOptions() {
List<String> exceptConfigs = Arrays.asList(
Expand Down
Loading