Skip to content

Commit

Permalink
Fix TLS doesn't work + Add tests for TLS connection communication
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesChenX committed Feb 23, 2024
1 parent 279a915 commit 47f1962
Show file tree
Hide file tree
Showing 13 changed files with 567 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
package im.turms.gateway.access.client.tcp;

import java.net.InetSocketAddress;
import jakarta.annotation.Nullable;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import reactor.core.publisher.Sinks;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.NettyPipeline;
import reactor.netty.tcp.TcpServer;

import im.turms.gateway.access.client.common.channel.ServiceAvailabilityHandler;
Expand All @@ -37,6 +37,7 @@
import im.turms.server.common.infra.healthcheck.ServerStatusManager;
import im.turms.server.common.infra.metrics.TurmsMicrometerChannelMetricsRecorder;
import im.turms.server.common.infra.net.BindException;
import im.turms.server.common.infra.net.SslContextSpecType;
import im.turms.server.common.infra.net.SslUtil;
import im.turms.server.common.infra.property.constant.RemoteAddressSourceProxyProtocolMode;
import im.turms.server.common.infra.property.env.common.SslProperties;
Expand All @@ -56,7 +57,6 @@ public final class TcpServerFactory {
private TcpServerFactory() {
}

@Nullable
public static DisposableServer create(
TcpProperties tcpProperties,
BlocklistService blocklistService,
Expand All @@ -73,6 +73,8 @@ public static DisposableServer create(
RemoteAddressSourceProxyProtocolMode proxyProtocolMode =
tcpProperties.getRemoteAddressSource()
.getProxyProtocolMode();

Sinks.One<InetSocketAddress> remoteAddressSink = Sinks.one();
TcpServer server = TcpServer.create()
.host(host)
.port(port)
Expand All @@ -90,15 +92,14 @@ public static DisposableServer create(
() -> new TurmsMicrometerChannelMetricsRecorder(
MetricNameConst.CLIENT_NETWORK,
"tcp"))
.doOnChannelInit((connectionObserver, channel, remoteAddress) -> channel.pipeline()
.addFirst("serviceAvailabilityHandler", serviceAvailabilityHandler))
.handle((in, out) -> {
Connection connection = (Connection) in;
Sinks.One<InetSocketAddress> remoteAddressSink = Sinks.one();
// Called for every new connection that is opened.
.doOnChannelInit((connectionObserver, channel, remoteAddress) -> {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst("serviceAvailabilityHandler", serviceAvailabilityHandler);
// Inbound
connection.addHandlerLast("varintLengthBasedFrameDecoder",
pipeline.addBefore(NettyPipeline.ReactiveBridge,
"varintLengthBasedFrameDecoder",
CodecFactory.getExtendedVarintLengthBasedFrameDecoder(maxFrameLength));
Channel channel = connection.channel();
if (RemoteAddressSourceProxyProtocolMode.REQUIRED == proxyProtocolMode) {
HAProxyUtil.addProxyProtocolHandlers(channel.pipeline(), address -> {
if (blocklistService.isIpBlocked(address.getAddress()
Expand All @@ -108,8 +109,6 @@ public static DisposableServer create(
remoteAddressSink.tryEmitValue(address);
}
});
channel.config()
.setAutoRead(true);
} else if (RemoteAddressSourceProxyProtocolMode.OPTIONAL == proxyProtocolMode) {
HAProxyUtil.addProxyProtocolDetectorHandler(channel.pipeline(), address -> {
if (blocklistService.isIpBlocked(address.getAddress()
Expand All @@ -119,22 +118,35 @@ public static DisposableServer create(
remoteAddressSink.tryEmitValue(address);
}
});
channel.config()
.setAutoRead(true);
} else {
remoteAddressSink.tryEmitValue((InetSocketAddress) channel.remoteAddress());
}

// Outbound
connection.addHandlerLast("varintLengthFieldPrepender",
pipeline.addLast("varintLengthFieldPrepender",
CodecFactory.getVarintLengthFieldPrepender());
// For advanced operations, they encode objects to buffers themselves,
// "protobufFrameEncoder" will just ignore them. But some simple
// operations will pass TurmsNotification instances down, so we still need to
// encode them.
connection.addHandlerLast("protobufFrameEncoder",
pipeline.addLast("protobufFrameEncoder",
CodecFactory.getProtobufFrameEncoder());

})
// Called when a connection is read (in/after channelActive(...)).
.handle((in, out) -> {
Connection connection = (Connection) in;
// Note:
// 1. We need to trigger the read event manually here.
// Otherwise, it will never read the inbound stream from the peer
// because we don't subscribe to the inbound stream until we get the peer
// address.
// 2. Although "setAutoRead" seems just setting the "autoRead" flag to true,
// it also triggers the read event under the hood.
// 3. Don't move "setAutoRead" to the callback of "doOnChannelInit" because the
// channel is not ready yet, and "setAutoRead" will not work.
connection.channel()
.config()
.setAutoRead(true);
return remoteAddressSink.asMono()
.flatMap(remoteAddress -> connectionListener.onAdded(connection,
remoteAddress,
Expand All @@ -144,7 +156,8 @@ public static DisposableServer create(
});
SslProperties ssl = tcpProperties.getSsl();
if (ssl.isEnabled()) {
server.secure(spec -> SslUtil.configureSslContextSpec(spec, ssl, true));
server = server.secure(spec -> SslUtil
.configureSslContextSpec(spec, SslContextSpecType.TCP, ssl, true));
}
try {
return server.bind()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import im.turms.server.common.infra.healthcheck.ServerStatusManager;
import im.turms.server.common.infra.metrics.TurmsMicrometerChannelMetricsRecorder;
import im.turms.server.common.infra.net.BindException;
import im.turms.server.common.infra.net.SslContextSpecType;
import im.turms.server.common.infra.net.SslUtil;
import im.turms.server.common.infra.property.constant.RemoteAddressSourceHttpHeaderMode;
import im.turms.server.common.infra.property.constant.RemoteAddressSourceProxyProtocolMode;
Expand Down Expand Up @@ -135,7 +136,10 @@ public static DisposableServer create(
}
SslProperties ssl = webSocketProperties.getSsl();
if (ssl.isEnabled()) {
server.secure(spec -> SslUtil.configureSslContextSpec(spec, ssl, true), true);
server = server.secure(
spec -> SslUtil
.configureSslContextSpec(spec, SslContextSpecType.HTTP11, ssl, true),
true);
}
try {
return server.bind()
Expand Down Expand Up @@ -197,7 +201,7 @@ private static Publisher<Void> handleHttpRequest(
// Note that:
// 1. PingWebSocketFrame will be handled by Netty itself
// 2. The flatMap is called by FluxReceive, which will release buffer after
// "onNext" returns
// "onNext" returns.
.flatMap(frame -> frame instanceof BinaryWebSocketFrame
? Mono.just(frame.content())
: Mono.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import im.turms.gateway.infra.ldap.handler.LdapMessageEncoder;
import im.turms.server.common.infra.logging.core.logger.Logger;
import im.turms.server.common.infra.logging.core.logger.LoggerFactory;
import im.turms.server.common.infra.net.SslContextSpecType;
import im.turms.server.common.infra.net.SslUtil;
import im.turms.server.common.infra.property.env.common.SslProperties;

Expand Down Expand Up @@ -113,8 +114,11 @@ public boolean isConnected() {
.port(port)
.metrics(true, () -> new MicrometerChannelMetricsRecorder(LDAP_CLIENT, "ldap"));
if (sslProperties != null && sslProperties.isEnabled()) {
client.secure(sslContextSpec -> SslUtil
.configureSslContextSpec(sslContextSpec, sslProperties, false));
client = client
.secure(sslContextSpec -> SslUtil.configureSslContextSpec(sslContextSpec,
SslContextSpecType.TCP,
sslProperties,
false));
}
return client.connect()
.map(conn -> {
Expand Down
Loading

0 comments on commit 47f1962

Please sign in to comment.