diff --git a/quickfixj-core/pom.xml b/quickfixj-core/pom.xml index fa7d97fc40..e5a8173b33 100644 --- a/quickfixj-core/pom.xml +++ b/quickfixj-core/pom.xml @@ -55,7 +55,12 @@ ${slf4j.version} test - + + io.netty + netty-example + 4.1.111.Final + test + org.apache.mina mina-core @@ -403,6 +408,15 @@ + + + + + kr.motd.maven + os-maven-plugin + 1.4.0.Final + + @@ -438,3 +452,4 @@ + diff --git a/quickfixj-core/src/main/java/quickfix/mina/CustomSslFilter.java b/quickfixj-core/src/main/java/quickfix/mina/CustomSslFilter.java new file mode 100644 index 0000000000..7092d6134d --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/mina/CustomSslFilter.java @@ -0,0 +1,35 @@ +package quickfix.mina; + +import org.apache.mina.core.filterchain.IoFilterChain; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.filter.ssl.SslFilter; + +import javax.net.ssl.SSLContext; + +/** + * Temporary {@link SslFilter} wrapper that prevents auto connect for initiators. + */ +public class CustomSslFilter extends SslFilter { + + private static final boolean DEFAULT_AUTO_START = true; + + private final boolean autoStart; + + public CustomSslFilter(SSLContext sslContext) { + this(sslContext, DEFAULT_AUTO_START); + } + + public CustomSslFilter(SSLContext sslContext, boolean autoStart) { + super(sslContext); + this.autoStart = autoStart; + } + + @Override + public void onPostAdd(IoFilterChain parent, String name, NextFilter next) throws Exception { + IoSession session = parent.getSession(); + + if (session.isConnected() && autoStart) { + onConnected(next, session); + } + } +} diff --git a/quickfixj-core/src/main/java/quickfix/mina/ProtocolFactory.java b/quickfixj-core/src/main/java/quickfix/mina/ProtocolFactory.java index 13b88f0eec..ad53faee1c 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/ProtocolFactory.java +++ b/quickfixj-core/src/main/java/quickfix/mina/ProtocolFactory.java @@ -52,7 +52,6 @@ public class ProtocolFactory { public final static int SOCKET = 0; public final static int VM_PIPE = 1; - public final static int PROXY = 2; public static String getTypeString(int type) { switch (type) { @@ -60,8 +59,6 @@ public static String getTypeString(int type) { return "SOCKET"; case VM_PIPE: return "VM_PIPE"; - case PROXY: - return "PROXY"; default: return "unknown"; } @@ -69,7 +66,7 @@ public static String getTypeString(int type) { public static SocketAddress createSocketAddress(int transportType, String host, int port) throws ConfigError { - if (transportType == SOCKET || transportType == PROXY) { + if (transportType == SOCKET) { return host != null ? new InetSocketAddress(host, port) : new InetSocketAddress(port); } else if (transportType == VM_PIPE) { return new VmPipeAddress(port); @@ -94,8 +91,6 @@ public static int getTransportType(String string) { return SOCKET; } else if (string.equalsIgnoreCase("VM_PIPE")) { return VM_PIPE; - } else if (string.equalsIgnoreCase("PROXY")) { - return PROXY; } else { throw new RuntimeError("Unknown Transport Type type: " + string); } diff --git a/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java index 22b112f27c..5374fc8b31 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java @@ -39,6 +39,7 @@ import quickfix.SessionID; import quickfix.SessionSettings; import quickfix.mina.CompositeIoFilterChainBuilder; +import quickfix.mina.CustomSslFilter; import quickfix.mina.EventHandlingStrategy; import quickfix.mina.NetworkingOptions; import quickfix.mina.ProtocolFactory; @@ -132,7 +133,7 @@ private void installSSL(AcceptorSocketDescriptor descriptor, log.info("Installing SSL filter for {}", descriptor.getAddress()); SSLConfig sslConfig = descriptor.getSslConfig(); SSLContext sslContext = SSLContextFactory.getInstance(sslConfig); - SslFilter sslFilter = new SslFilter(sslContext); + SslFilter sslFilter = new CustomSslFilter(sslContext); sslFilter.setNeedClientAuth(sslConfig.isNeedClientAuth()); sslFilter.setEnabledCipherSuites(sslConfig.getEnabledCipherSuites() != null ? sslConfig.getEnabledCipherSuites() : SSLSupport.getDefaultCipherSuites(sslContext)); diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java index e3dbae2be2..fbb34a7a28 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java @@ -33,6 +33,7 @@ import quickfix.SessionID; import quickfix.SystemTime; import quickfix.mina.CompositeIoFilterChainBuilder; +import quickfix.mina.CustomSslFilter; import quickfix.mina.EventHandlingStrategy; import quickfix.mina.NetworkingOptions; import quickfix.mina.ProtocolFactory; @@ -186,7 +187,7 @@ private void setupIoConnector() throws ConfigError, GeneralSecurityException { private SslFilter installSslFilter(CompositeIoFilterChainBuilder ioFilterChainBuilder) throws GeneralSecurityException { final SSLContext sslContext = SSLContextFactory.getInstance(sslConfig); - final SslFilter sslFilter = new SslFilter(sslContext); + final SslFilter sslFilter = new CustomSslFilter(sslContext, false); sslFilter.setEnabledCipherSuites(sslConfig.getEnabledCipherSuites() != null ? sslConfig.getEnabledCipherSuites() : SSLSupport.getDefaultCipherSuites(sslContext)); sslFilter.setEnabledProtocols(sslConfig.getEnabledProtocols() != null ? sslConfig.getEnabledProtocols() diff --git a/quickfixj-core/src/test/java/quickfix/mina/SocksProxyServer.java b/quickfixj-core/src/test/java/quickfix/mina/SocksProxyServer.java new file mode 100644 index 0000000000..07faf1e048 --- /dev/null +++ b/quickfixj-core/src/test/java/quickfix/mina/SocksProxyServer.java @@ -0,0 +1,74 @@ +package quickfix.mina; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.example.socksproxy.SocksServerInitializer; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.apache.mina.util.DaemonThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadFactory; + +/** + * Simple SOCKS proxy server based on Netty examples. Only SOCKS protocols are currently supported. + * The implementation performs the proxy handshake, but it doesn't perform any user authentication. + */ +public class SocksProxyServer { + + private static final Logger LOGGER = LoggerFactory.getLogger(SocksProxyServer.class); + private static final ThreadFactory THREAD_FACTORY = new DaemonThreadFactory(); + + private final ServerBootstrap bootstrap; + private final int port; + private Channel channel; + + public SocksProxyServer(int port) { + this.bootstrap = new ServerBootstrap(); + this.bootstrap.group(new NioEventLoopGroup(THREAD_FACTORY), new NioEventLoopGroup(THREAD_FACTORY)) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .childHandler(new SocksServerInitializer()); + this.port = port; + } + + public synchronized void start() { + if (channel != null) { + throw new IllegalStateException("SOCKS proxy server is running already"); + } + + try { + channel = bootstrap.bind(port) + .sync() + .channel(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + LOGGER.info("SOCKS proxy server started at port: {}", port); + } + + public synchronized void stop() { + if (channel == null) { + throw new IllegalStateException("SOCKS proxy server is not running"); + } + + try { + channel.close().sync(); + channel = null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Failed to close SOCKS proxy server"); + } + + LOGGER.info("SOCKS proxy server stopped at port {}", port); + } + + public int getPort() { + return port; + } +} diff --git a/quickfixj-core/src/test/java/quickfix/mina/SocksProxyTest.java b/quickfixj-core/src/test/java/quickfix/mina/SocksProxyTest.java new file mode 100644 index 0000000000..1403da9bd7 --- /dev/null +++ b/quickfixj-core/src/test/java/quickfix/mina/SocksProxyTest.java @@ -0,0 +1,177 @@ +package quickfix.mina; + +import org.apache.mina.util.AvailablePortFinder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import quickfix.Acceptor; +import quickfix.ApplicationAdapter; +import quickfix.ConfigError; +import quickfix.DefaultMessageFactory; +import quickfix.FixVersions; +import quickfix.Initiator; +import quickfix.MemoryStoreFactory; +import quickfix.MessageFactory; +import quickfix.MessageStoreFactory; +import quickfix.Session; +import quickfix.SessionFactory; +import quickfix.SessionID; +import quickfix.SessionSettings; +import quickfix.ThreadedSocketAcceptor; +import quickfix.ThreadedSocketInitiator; + +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +/** + * Performs end to end tests against SOCKS proxy server. + */ +public class SocksProxyTest { + + // maximum time to wait for session logon + private static final long TIMEOUT_SECONDS = 5; + + private SocksProxyServer proxyServer; + + @Before + public void setUp() { + int proxyPort = AvailablePortFinder.getNextAvailable(); + + proxyServer = new SocksProxyServer(proxyPort); + proxyServer.start(); + } + + @After + public void tearDown() { + proxyServer.stop(); + } + + @Test + public void shouldLoginViaSocks4Proxy() throws ConfigError { + shouldLoginSocksProxy("4"); + } + + @Test + public void shouldLoginViaSocks4aProxy() throws ConfigError { + shouldLoginSocksProxy("4a"); + } + + @Test + public void shouldLoginViaSocks5Proxy() throws ConfigError { + shouldLoginSocksProxy("5"); + } + + private void shouldLoginSocksProxy(String proxyVersion) throws ConfigError { + int port = AvailablePortFinder.getNextAvailable(); + SessionConnector acceptor = createAcceptor(port); + + try { + acceptor.start(); + + SessionConnector initiator = createInitiator(proxyVersion, proxyServer.getPort(), port); + + try { + initiator.start(); + assertLoggedOn(acceptor, new SessionID(FixVersions.BEGINSTRING_FIX44, "ALICE", "BOB")); + assertLoggedOn(initiator, new SessionID(FixVersions.BEGINSTRING_FIX44, "BOB", "ALICE")); + } finally { + initiator.stop(); + } + } finally { + acceptor.stop(); + } + } + + private void assertLoggedOn(SessionConnector connector, SessionID sessionID) { + long startTimeNanos = System.nanoTime(); + + while (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTimeNanos) < TIMEOUT_SECONDS) { + if (isLoggedOn(connector, sessionID)) { + return; + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted", e); + } + } + + throw new AssertionError("Session " + sessionID + " is not logged on"); + } + + private boolean isLoggedOn(SessionConnector connector, SessionID sessionID) { + Session session = connector.getSessionMap().get(sessionID); + + if (session == null) { + return false; + } + + return session.isLoggedOn(); + } + + private SessionConnector createAcceptor(int port) throws ConfigError { + MessageStoreFactory messageStoreFactory = new MemoryStoreFactory(); + MessageFactory messageFactory = new DefaultMessageFactory(); + SessionSettings acceptorSettings = createAcceptorSettings("ALICE", "BOB", port); + return new ThreadedSocketAcceptor(new ApplicationAdapter(), messageStoreFactory, acceptorSettings, messageFactory); + } + + private SessionConnector createInitiator(String proxyVersion, int proxyPort, int port) throws ConfigError { + MessageStoreFactory messageStoreFactory = new MemoryStoreFactory(); + MessageFactory messageFactory = new DefaultMessageFactory(); + SessionSettings initiatorSettings = createInitiatorSettings("BOB", "ALICE", proxyVersion, proxyPort, port); + return new ThreadedSocketInitiator(new ApplicationAdapter(), messageStoreFactory, initiatorSettings, messageFactory); + } + + private SessionSettings createAcceptorSettings(String senderId, String targetId, int port) { + HashMap defaults = new HashMap<>(); + defaults.put(SessionFactory.SETTING_CONNECTION_TYPE, "acceptor"); + defaults.put(Acceptor.SETTING_SOCKET_ACCEPT_PORT, Integer.toString(port)); + defaults.put(Session.SETTING_START_TIME, "00:00:00"); + defaults.put(Session.SETTING_END_TIME, "00:00:00"); + defaults.put(Session.SETTING_HEARTBTINT, "30"); + + SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, senderId, targetId); + + SessionSettings sessionSettings = new SessionSettings(); + sessionSettings.set(defaults); + sessionSettings.setString(sessionID, "BeginString", FixVersions.BEGINSTRING_FIX44); + sessionSettings.setString(sessionID, "DataDictionary", "FIX44.xml"); + sessionSettings.setString(sessionID, "SenderCompID", senderId); + sessionSettings.setString(sessionID, "TargetCompID", targetId); + + return sessionSettings; + } + + private SessionSettings createInitiatorSettings(String senderId, String targetId, String proxyVersion, int proxyPort, int port) { + HashMap defaults = new HashMap<>(); + defaults.put(SessionFactory.SETTING_CONNECTION_TYPE, "initiator"); + defaults.put(Initiator.SETTING_SOCKET_CONNECT_PROTOCOL, ProtocolFactory.getTypeString(ProtocolFactory.SOCKET)); + defaults.put(Initiator.SETTING_SOCKET_CONNECT_HOST, "localhost"); + defaults.put(Initiator.SETTING_SOCKET_CONNECT_PORT, Integer.toString(port)); + defaults.put(Initiator.SETTING_RECONNECT_INTERVAL, "2"); + defaults.put(Initiator.SETTING_PROXY_HOST, "localhost"); + defaults.put(Initiator.SETTING_PROXY_PORT, Integer.toString(proxyPort)); + defaults.put(Initiator.SETTING_PROXY_TYPE, "socks"); + defaults.put(Initiator.SETTING_PROXY_VERSION, proxyVersion); + defaults.put(Initiator.SETTING_PROXY_USER, "proxy-user"); + defaults.put(Initiator.SETTING_PROXY_PASSWORD, "proxy-password"); + defaults.put(Session.SETTING_START_TIME, "00:00:00"); + defaults.put(Session.SETTING_END_TIME, "00:00:00"); + defaults.put(Session.SETTING_HEARTBTINT, "30"); + + SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, senderId, targetId); + + SessionSettings sessionSettings = new SessionSettings(); + sessionSettings.set(defaults); + sessionSettings.setString(sessionID, "BeginString", FixVersions.BEGINSTRING_FIX44); + sessionSettings.setString(sessionID, "DataDictionary", "FIX44.xml"); + sessionSettings.setString(sessionID, "SenderCompID", senderId); + sessionSettings.setString(sessionID, "TargetCompID", targetId); + + return sessionSettings; + } + +} diff --git a/quickfixj-core/src/test/java/quickfix/mina/ssl/SSLCertificateTest.java b/quickfixj-core/src/test/java/quickfix/mina/ssl/SSLCertificateTest.java index f5ff7cb145..74cf869961 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/ssl/SSLCertificateTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/ssl/SSLCertificateTest.java @@ -31,6 +31,7 @@ import quickfix.ConfigError; import quickfix.DefaultMessageFactory; import quickfix.FixVersions; +import quickfix.Initiator; import quickfix.MemoryStoreFactory; import quickfix.MessageFactory; import quickfix.MessageStoreFactory; @@ -52,12 +53,14 @@ import java.security.cert.Certificate; import java.security.cert.X509Certificate; import java.util.HashMap; +import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import org.apache.mina.util.AvailablePortFinder; import org.junit.After; +import quickfix.mina.SocksProxyServer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; @@ -108,6 +111,70 @@ public void shouldAuthenticateServerCertificate() throws Exception { } } + @Test + public void shouldLoginViaSocks4Proxy() throws Exception { + shouldAuthenticateServerCertificateViaSocksProxy("4"); + } + + @Test + public void shouldLoginViaSocks4aProxy() throws Exception { + shouldAuthenticateServerCertificateViaSocksProxy("4a"); + } + + @Test + public void shouldLoginViaSocks5Proxy() throws Exception { + shouldAuthenticateServerCertificateViaSocksProxy("5"); + } + + public void shouldAuthenticateServerCertificateViaSocksProxy(String proxyVersion) throws Exception { + int proxyPort = AvailablePortFinder.getNextAvailable(); + + SocksProxyServer proxyServer = new SocksProxyServer(proxyPort); + proxyServer.start(); + + try { + int port = AvailablePortFinder.getNextAvailable(); + TestAcceptor acceptor = new TestAcceptor(createAcceptorSettings("single-session/server.keystore", false, + "single-session/empty.keystore", CIPHER_SUITES_TLS, "TLSv1.2", "JKS", "JKS", port)); + + try { + acceptor.start(); + + SessionSettings initiatorSettings = createInitiatorSettings("single-session/empty.keystore", "single-session/client.truststore", + CIPHER_SUITES_TLS, "TLSv1.2", "ZULU", "ALFA", Integer.toString(port), "JKS", "JKS"); + + Properties defaults = initiatorSettings.getDefaultProperties(); + + defaults.put(Initiator.SETTING_PROXY_HOST, "localhost"); + defaults.put(Initiator.SETTING_PROXY_PORT, Integer.toString(proxyPort)); + defaults.put(Initiator.SETTING_PROXY_TYPE, "socks"); + defaults.put(Initiator.SETTING_PROXY_VERSION, proxyVersion); + defaults.put(Initiator.SETTING_PROXY_USER, "proxy-user"); + defaults.put(Initiator.SETTING_PROXY_PASSWORD, "proxy-password"); + + TestInitiator initiator = new TestInitiator(initiatorSettings); + + try { + initiator.start(); + + initiator.assertNoSslExceptionThrown(); + initiator.assertLoggedOn(new SessionID(FixVersions.BEGINSTRING_FIX44, "ZULU", "ALFA")); + initiator.assertAuthenticated(new SessionID(FixVersions.BEGINSTRING_FIX44, "ZULU", "ALFA"), new BigInteger("1448538842")); + + acceptor.assertNoSslExceptionThrown(); + acceptor.assertLoggedOn(new SessionID(FixVersions.BEGINSTRING_FIX44, "ALFA", "ZULU")); + acceptor.assertNotAuthenticated(new SessionID(FixVersions.BEGINSTRING_FIX44, "ALFA", "ZULU")); + } finally { + initiator.stop(); + } + } finally { + acceptor.stop(); + } + } finally { + proxyServer.stop(); + } + } + /** * Server certificate has Common Name = localhost and no Server Alternative Name extension. */