Skip to content

Commit

Permalink
Add LoopResources Configuration Support (#232)
Browse files Browse the repository at this point in the history
Motivation:
Enhances flexibility by allowing custom `LoopResources` configuration
for optimized event loop management.

Modifications:
Introduced `LoopResources` configuration option in
`MysqlConnectionConfiguration` and updated relevant documentation.

Result:
Enables performance tuning and flexibility, improving adaptability for
various deployment scenarios. Resolves #229
  • Loading branch information
jchrys authored Feb 14, 2024
1 parent fcf1f18 commit aff4cc5
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 5 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
.option(Option.valueOf("tcpKeepAlive"), true) // optional, default false
.option(Option.valueOf("tcpNoDelay"), true) // optional, default false
.option(Option.valueOf("compressionAlgorithms"), "zstd") // optional, default UNCOMPRESSED
.option(Option.valueOf("loopResources"), LoopResources.create("r2dbc")) // optional, default null, null means uses global tcp resources as loopResources (since 1.1.2)
.option(Option.valueOf("autodetectExtensions"), false) // optional, default false
.option(Option.valueOf("passwordPublisher"), Mono.just("password")) // optional, default null, null means has no passwordPublisher (since 1.0.5 / 0.9.6)
.build();
Expand Down Expand Up @@ -194,6 +195,7 @@ MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builde
.tcpKeepAlive(true) // optional, controls TCP Keep Alive, default is false
.tcpNoDelay(true) // optional, controls TCP No Delay, default is false
.compressionAlgorithms(CompressionAlgorithm.ZSTD, CompressionAlgotihm.ZLIB) // optional, default is UNCOMPRESSED
.loopResources(LoopResources.create("r2dbc")) // optional, default null, null means uses global tcp resources as loopResources (since 1.1.2)
.autodetectExtensions(false) // optional, controls extension auto-detect, default is true
.extendWith(MyExtension.INSTANCE) // optional, manual extend an extension into extensions, default using auto-detect
.passwordPublisher(Mono.just("password")) // optional, default null, null means has no password publisher (since 1.0.5 / 0.9.6)
Expand Down Expand Up @@ -246,6 +248,7 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
| useServerPrepareStatement | `true`, `false` or `Predicate<String>` | Optional, default is `false` | See following notice |
| allowLoadLocalInfileInPath | A path | Optional, default is `null` | The path that allows `LOAD DATA LOCAL INFILE` to load file data |
| compressionAlgorithms | A list of `CompressionAlgorithm` | Optional, default is `UNCOMPRESSED` | The compression algorithms for MySQL connection |
| loopResources | A `LoopResources` | Optional, default is `null` | The loop resources for MySQL connection |
| passwordPublisher | A `Publisher<String>` | Optional, default is `null` | The password publisher, see following notice |

- `SslMode` Considers security level and verification for SSL, make sure the database server supports SSL before you want change SSL mode to `REQUIRED` or higher. **The Unix Domain Socket only offers "DISABLED" available**
Expand Down Expand Up @@ -278,6 +281,7 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
- `ZLIB` Use Zlib compression protocol, it is available on almost all MySQL versions (`5.x` and above)
- `ZSTD` Use Z-standard compression protocol, it is available since MySQL `8.0.18` or above, requires an extern dependency `com.github.luben:zstd-jni`
- For scenarios where the network environment is poor or the amount of data is always large, using a compression protocol may be useful
- `loopResources` Considers loop resources for MySQL connection.

Should use `enum` in [Programmatic](#programmatic-configuration) configuration that not like discovery configurations, except `TlsVersions` (All elements of `TlsVersions` will be always `String` which is case-sensitive).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.netty.handler.ssl.SslContextBuilder;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpResources;

import javax.net.ssl.HostnameVerifier;
import java.net.Socket;
Expand Down Expand Up @@ -105,6 +107,8 @@ public final class MySqlConnectionConfiguration {

private final int zstdCompressionLevel;

private final LoopResources loopResources;

private final Extensions extensions;

@Nullable
Expand All @@ -119,6 +123,7 @@ private MySqlConnectionConfiguration(
@Nullable Path loadLocalInfilePath, int localInfileBufferSize,
int queryCacheSize, int prepareCacheSize,
Set<CompressionAlgorithm> compressionAlgorithms, int zstdCompressionLevel,
@Nullable LoopResources loopResources,
Extensions extensions, @Nullable Publisher<String> passwordPublisher
) {
this.isHost = isHost;
Expand All @@ -141,6 +146,7 @@ private MySqlConnectionConfiguration(
this.prepareCacheSize = prepareCacheSize;
this.compressionAlgorithms = compressionAlgorithms;
this.zstdCompressionLevel = zstdCompressionLevel;
this.loopResources = loopResources == null? TcpResources.get() : loopResources;
this.extensions = extensions;
this.passwordPublisher = passwordPublisher;
}
Expand Down Expand Up @@ -239,6 +245,10 @@ int getZstdCompressionLevel() {
return zstdCompressionLevel;
}

LoopResources getLoopResources() {
return loopResources;
}

Extensions getExtensions() {
return extensions;
}
Expand Down Expand Up @@ -277,6 +287,7 @@ public boolean equals(Object o) {
prepareCacheSize == that.prepareCacheSize &&
compressionAlgorithms.equals(that.compressionAlgorithms) &&
zstdCompressionLevel == that.zstdCompressionLevel &&
Objects.equals(loopResources, that.loopResources) &&
extensions.equals(that.extensions) &&
Objects.equals(passwordPublisher, that.passwordPublisher);
}
Expand All @@ -286,7 +297,8 @@ public int hashCode() {
return Objects.hash(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, connectTimeout,
serverZoneId, zeroDateOption, user, password, database, createDatabaseIfNotExist,
preferPrepareStatement, loadLocalInfilePath, localInfileBufferSize, queryCacheSize,
prepareCacheSize, compressionAlgorithms, zstdCompressionLevel, extensions, passwordPublisher);
prepareCacheSize, compressionAlgorithms, zstdCompressionLevel, loopResources,
extensions, passwordPublisher);
}

@Override
Expand All @@ -303,6 +315,7 @@ public String toString() {
", queryCacheSize=" + queryCacheSize + ", prepareCacheSize=" + prepareCacheSize +
", compressionAlgorithms=" + compressionAlgorithms +
", zstdCompressionLevel=" + zstdCompressionLevel +
", loopResources=" + loopResources +
", extensions=" + extensions + ", passwordPublisher=" + passwordPublisher + '}';
}

Expand All @@ -317,6 +330,7 @@ public String toString() {
", prepareCacheSize=" + prepareCacheSize +
", compressionAlgorithms=" + compressionAlgorithms +
", zstdCompressionLevel=" + zstdCompressionLevel +
", loopResources=" + loopResources +
", extensions=" + extensions + ", passwordPublisher=" + passwordPublisher + '}';
}

Expand Down Expand Up @@ -393,6 +407,9 @@ public static final class Builder {

private int zstdCompressionLevel = 3;

@Nullable
private LoopResources loopResources;

private boolean autodetectExtensions = true;

private final List<Extension> extensions = new ArrayList<>();
Expand Down Expand Up @@ -425,7 +442,7 @@ public MySqlConnectionConfiguration build() {
connectTimeout, zeroDateOption, serverZoneId, user, password, database,
createDatabaseIfNotExist, preferPrepareStatement, loadLocalInfilePath,
localInfileBufferSize, queryCacheSize, prepareCacheSize,
compressionAlgorithms, zstdCompressionLevel,
compressionAlgorithms, zstdCompressionLevel, loopResources,
Extensions.from(extensions, autodetectExtensions), passwordPublisher);
}

Expand Down Expand Up @@ -911,6 +928,19 @@ public Builder zstdCompressionLevel(int level) {
return this;
}

/**
* Configures the {@link LoopResources} for the driver.
* Default to {@link TcpResources#get() global tcp resources}.
* @param loopResources the {@link LoopResources}.
* @return this {@link Builder}.
* @throws IllegalArgumentException if {@code loopResources} is {@code null}.
* @since 1.1.2
*/
public Builder loopResources(LoopResources loopResources) {
this.loopResources = requireNonNull(loopResources, "loopResources must not be null");
return this;
}

/**
* Configures whether to use {@link ServiceLoader} to discover and register extensions. Defaults to
* {@code true}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private static Mono<MySqlConnection> getMySqlConnection(
final int prepareCacheSize,
@Nullable final CharSequence password) {
return Client.connect(ssl, address, configuration.isTcpKeepAlive(), configuration.isTcpNoDelay(),
context, configuration.getConnectTimeout())
context, configuration.getConnectTimeout(), configuration.getLoopResources())
.flatMap(client -> {
// Lazy init database after handshake/login
String db = createDbIfNotExist ? "" : database;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.r2dbc.spi.ConnectionFactoryProvider;
import io.r2dbc.spi.Option;
import org.reactivestreams.Publisher;
import reactor.netty.resources.LoopResources;

import javax.net.ssl.HostnameVerifier;
import java.time.Duration;
Expand Down Expand Up @@ -217,6 +218,14 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
public static final Option<Integer> ZSTD_COMPRESSION_LEVEL =
Option.valueOf("zstdCompressionLevel");

/**
* Option to set the {@link LoopResources} for the connection.
* Default to {@link reactor.netty.tcp.TcpResources#get() global tcp Resources}
*
* @since 1.1.2
*/
public static final Option<LoopResources> LOOP_RESOURCES = Option.valueOf("loopResources");

/**
* Option to set the maximum size of the {@link Query} parsing cache. Default to {@code 256}.
*
Expand Down Expand Up @@ -312,6 +321,8 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
).to(builder::compressionAlgorithms);
mapper.optional(ZSTD_COMPRESSION_LEVEL).asInt()
.to(builder::zstdCompressionLevel);
mapper.optional(LOOP_RESOURCES).as(LoopResources.class)
.to(builder::loopResources);
mapper.optional(PASSWORD_PUBLISHER).as(Publisher.class)
.to(builder::passwordPublisher);

Expand Down
8 changes: 6 additions & 2 deletions src/main/java/io/asyncer/r2dbc/mysql/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -116,17 +117,20 @@ public interface Client {
* @param tcpNoDelay if enable the {@link ChannelOption#TCP_NODELAY}
* @param context the connection context
* @param connectTimeout connect timeout, or {@code null} if it has no timeout
* @param loopResources the loop resources to use
* @return A {@link Mono} that will emit a connected {@link Client}.
* @throws IllegalArgumentException if {@code ssl}, {@code address} or {@code context} is {@code null}.
* @throws ArithmeticException if {@code connectTimeout} milliseconds overflow as an int
*/
static Mono<Client> connect(MySqlSslConfiguration ssl, SocketAddress address, boolean tcpKeepAlive,
boolean tcpNoDelay, ConnectionContext context, @Nullable Duration connectTimeout) {
boolean tcpNoDelay, ConnectionContext context, @Nullable Duration connectTimeout,
LoopResources loopResources) {
requireNonNull(ssl, "ssl must not be null");
requireNonNull(address, "address must not be null");
requireNonNull(context, "context must not be null");

TcpClient tcpClient = TcpClient.newConnection();
TcpClient tcpClient = TcpClient.newConnection()
.runOn(loopResources);

if (connectTimeout != null) {
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
Expand Down

0 comments on commit aff4cc5

Please sign in to comment.